当前位置: 首页 > news >正文

Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

章节内容

上一节完成了如下的内容:

  • 编写Agent Conf配置文件
  • 收集Hive数据
  • 汇聚到HDFS中
  • 测试效果

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。
之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。

  • 2C4G 编号 h121
  • 2C4G 编号 h122
  • 2C2G 编号 h123

在这里插入图片描述

文档推荐

除了官方文档以外,这里有一个写的很好的中文文档:
https://flume.liyifeng.org/

监控目录

业务需求

  • 想要监控指定目录 收集信息并上传到HDFS中

Source

选择 spooldir,因为 spooldir 能够保证数据不丢失,且能够进行断点续传,但是延迟较高,不能实时监控。

Channel

选择 memory

Sink

选择 HDFS

需要注意

  • 拷贝到 spool 目录下的文件 不可以再打开编辑
  • 无法监控子目录的文件夹变动
  • 被监控文件夹每500毫秒 扫描一次文件变动
  • 适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

配置文件

cd /opt/wzk/flume_test
vim flume_spooldir-hdfs.conf

我们需要写入如下内容

# Name the components on this agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
# Describe/configure the source
a3.sources.r3.type = spooldir
# 注意这里的文件夹 换成自己的!!!
a3.sources.r3.spoolDir = /opt/wzk/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true# 忽略以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 500
# Describe the sink
a3.sinks.k3.type = hdfs
# 注意修改成你自己的IP!!!
a3.sinks.k3.hdfs.path = hdfs://h121.wzk.icu:9000/flume/upload/%Y%m%d/%H%M# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒500个Event,flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 500
# 设置文件类型
a3.sinks.k3.hdfs.fileType = DataStream
# 60秒滚动一次
a3.sinks.k3.hdfs.rollInterval = 60
# 128M滚动一次
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件滚动与event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# 最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动Agent

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file flume-spooldir-hdfs.conf \
-Dflume.root.logger=INFO,console

在这里插入图片描述

测试效果

Flume

cd /opt/wzk/upload
vim 1.txt

随便向其中写入一些内容,并保存,可以看到Flume已经有反应了。
在这里插入图片描述

HDFS

查看HDFS,也已经有内容了
在这里插入图片描述

采集双写

这里业务上需要:

  • Flume将数据写入本地
  • Flume将数据写入HDFS

分析实现

  • 需要多个Agent级联实现
  • Source选择taildir
  • Channel选择memory
  • 最终的Sink分别选择HDFS,file_roll

在这里插入图片描述

配置文件1

配置文件包含如下内容:

  • 1个 taildir source
  • 2个 memory channel
  • 2个 avro sink

新建文件

vim flume-taildir-avro.conf

写入如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# source
a1.sources.r1.type = taildir
# 记录每个文件最新消费位置
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
# 备注:.*log 是正则表达式;这里写成 *.log 是错误的
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux123
a1.sinks.k1.port = 9091
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux123
a1.sinks.k2.port = 9092
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置文件2

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 hdfs sink

新建配置文件

vim flume-avro-hdfs.conf

写入如下的内容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux123
a2.sources.r1.port = 9091
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 500个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 500
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 60秒生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 0
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

配置文件3

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 file_roll sink

新建配置文件

vim flume-avro-file.conf

写入如下的内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux123
a3.sources.r1.port = 9092
# Describe the sink
a3.sinks.k1.type = file_roll
# 目录需要提前创建好
a3.sinks.k1.sink.directory = /root/flume/output
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

启动Agent1

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file ~/conf/flume-avro-file.conf \
-Dflume.root.logger=INFO,console &

启动Agent2

$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file ~/conf/flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &

启动Agent3

$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file ~/conf/flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console &

Hive测试

hive -e "show databases;"

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • anaconda常用指令学习
  • 企业网络实验dhcp-snooping、ip source check,防非法dhcp服务器、自动获取ip(虚拟机充当DHCP服务器)、禁手动修改IP
  • 【爬虫】Python实现,模拟天眼查登录验证获取token
  • windows电脑的linux虚拟机连接电脑网络的方法
  • 项目收获总结--本地缓存方案选型及使用缓存的坑
  • 大数据基础:Hadoop之MapReduce重点架构原理
  • Stable Diffusion / huggingface 相关配置问题汇总
  • 移动端 火星坐标体系、百度坐标体系和全球坐标体系,该如何选择?
  • threadx netxduo stm32f407上实现http server
  • bug修复 修复修复修复
  • 【Git基本操作】添加文件 | 修改文件 | 及其各场景下.git目录树的变化
  • SpringBoot实战:处理全局异常
  • Apache Spark详解
  • CSS学习碎碎念之卡片展示
  • 《昇思25天学习打卡营第20天|onereal》
  • [js高手之路]搞清楚面向对象,必须要理解对象在创建过程中的内存表示
  • 《剑指offer》分解让复杂问题更简单
  • in typeof instanceof ===这些运算符有什么作用
  • node学习系列之简单文件上传
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • SpringCloud集成分布式事务LCN (一)
  • 测试如何在敏捷团队中工作?
  • 简单易用的leetcode开发测试工具(npm)
  • 开年巨制!千人千面回放技术让你“看到”Flutter用户侧问题
  • 普通函数和构造函数的区别
  • 前端
  • 全栈开发——Linux
  • 如何用vue打造一个移动端音乐播放器
  • 新书推荐|Windows黑客编程技术详解
  • 硬币翻转问题,区间操作
  • 责任链模式的两种实现
  • 正则学习笔记
  • No resource identifier found for attribute,RxJava之zip操作符
  • Python 之网络式编程
  • 阿里云移动端播放器高级功能介绍
  • 关于Kubernetes Dashboard漏洞CVE-2018-18264的修复公告
  • #LLM入门|Prompt#3.3_存储_Memory
  • %check_box% in rails :coditions={:has_many , :through}
  • (13):Silverlight 2 数据与通信之WebRequest
  • (C#)获取字符编码的类
  • (delphi11最新学习资料) Object Pascal 学习笔记---第14章泛型第2节(泛型类的类构造函数)
  • (Matalb时序预测)PSO-BP粒子群算法优化BP神经网络的多维时序回归预测
  • (附源码)c#+winform实现远程开机(广域网可用)
  • (附源码)springboot工单管理系统 毕业设计 964158
  • (十)Flink Table API 和 SQL 基本概念
  • (推荐)叮当——中文语音对话机器人
  • (循环依赖问题)学习spring的第九天
  • .htaccess 强制https 单独排除某个目录
  • .net core 使用js,.net core 使用javascript,在.net core项目中怎么使用javascript
  • .NET 解决重复提交问题
  • .net 前台table如何加一列下拉框_如何用Word编辑参考文献
  • .NET6 开发一个检查某些状态持续多长时间的类
  • .NET框架设计—常被忽视的C#设计技巧
  • /3GB和/USERVA开关
  • @property @synthesize @dynamic 及相关属性作用探究