当前位置: 首页 > news >正文

【Flume Kafaka实战】Using Kafka with Flume

一 目标

在Cloudera Manager中创建两个Flume的Agent,Agent1从local file中获取内容,写入到kafka的队列中。Agent2以Agent1的sink作为source,将数据从kafka中读取出来,写入到HDFS中。

二 实战

2.1 Kafka Sink

第一步,在Cloudera Manager中安装Flume,安装时指定两个Agent。这一步很简单。

第二步,创建一个新Role Group。默认情况下,所有的Agent都处于一个叫Agent Default Group的角色组中,处于同一角色组中的Agent共享相同的配置。但是在我们这个例子中,两个Agent要完成不同的工作,需要不同的配置。所有新建一个Role Group,并把其中一个Agent移到到这个新的Group中,如下图所示。

第三步,分别编辑两个Agent的配置文件,我的第一个Agent名字为file2Kafka,配置文件内容如下。不难看出,这个配置的source就是去tail一个本地文件,然后写入到kafka的消息队列中。

即:Kafka Sink

# Name the components on this agent
file2Kafka.sources = file2Kafka_source
file2Kafka.sinks = file2Kafka_sink
file2Kafka.channels = file2Kafka_channel# Describe/configure the source
file2Kafka.sources.file2Kafka_source.type = exec
file2Kafka.sources.file2Kafka_source.command = tail -F /home/demo/flume-exec.txt# Describe the sink
file2Kafka.sinks.file2Kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
# topic前不加kafka
file2Kafka.sinks.file2Kafka_sink.topic = flumetest
file2Kafka.sinks.file2Kafka_sink.kafka.bootstrap.servers= slave1:9092,slave2:9092
file2Kafka.sinks.file2Kafka_sink.kafka.flumeBatchSize= 20# Use a channel which buffers events in memory
file2Kafka.channels.file2Kafka_channel.type = memory
file2Kafka.channels.file2Kafka_channel.capacity = 1000
file2Kafka.channels.file2Kafka_channel.transactionCapacity = 1000# Bind the source and sink to the channel
file2Kafka.sources.file2Kafka_source.channels = file2Kafka_channel
file2Kafka.sinks.file2Kafka_sink.channel = file2Kafka_channel

2.2 Kafka Source

第二Agent的名字是kafka2Hdfs,配置文件如下。这个配置的内容就是把Agent1中写到kafka的数据读出来,然后写入到HDFS中。注意hdfs.path这个配置,由于在Cloudera Manager中,Flume知道HDFS相关的配置,所以无需去加入hdfs://my-cluster这样的协议前缀。

# Name the components on this agent
kafka2Hdfs.sources = kafka2Hdfs_source
kafka2Hdfs.sinks = kafka2Hdfs_sink
kafka2Hdfs.channels = kafka2Hdfs_channel# Describe/configure the source
kafka2Hdfs.sources.kafka2Hdfs_source.type = org.apache.flume.source.kafka.KafkaSource
kafka2Hdfs.sources.kafka2Hdfs_source.batchSize = 10
kafka2Hdfs.sources.kafka2Hdfs_source.batchDurationMillis = 1000
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.topics = flumetest
kafka2Hdfs.sources.kafka2Hdfs_source.kafka.consumer.group.id = flume# Describe the sink
kafka2Hdfs.sinks.kafka2Hdfs_sink.type = hdfs
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.path = /flume/
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.fileType = DataStream
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.filePrefix=sxt
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollCount=0
kafka2Hdfs.sinks.kafka2Hdfs_sink.hdfs.rollInterval=0# Use a channel which buffers events in memory
kafka2Hdfs.channels.kafka2Hdfs_channel.type = memory
kafka2Hdfs.channels.kafka2Hdfs_channel.capacity = 1000
kafka2Hdfs.channels.kafka2Hdfs_channel.transactionCapacity = 100# Bind the source and sink to the channel
kafka2Hdfs.sources.kafka2Hdfs_source.channels = kafka2Hdfs_channel
kafka2Hdfs.sinks.kafka2Hdfs_sink.channel = kafka2Hdfs_channel

整个配置完成之后,Cloudera Manager中的界面如下图:

在运行中可能会出现一些目录读写的权限问题,需要去修改hdfs中相关目录的权限。比如我的配置中,数据是写到/flume这个目录下的,这个目录我是用root用户去创建的,但flume运行是使用一个叫flume的用户名来运行的,所以用hdfs dfs -chmod 777 /flume把这个目录的读写权限放开了。

这是一个例子,主要演示如何在cloudera manager中把两个flume的agent串联在一起使用。在现实的生产中,如果需要把一个文本数据通过kakfa写入到hdfs中,更合理的做法是使用一个agent,把kafka作为channel来使用。具体可以参考https://www.cloudera.com/documentation/kafka/latest/topics/kafka_flume.html

2.3 Kafka Channel

# Name the components on this agent
kafkaCh.sources = src_1_file
kafkaCh.channels = ch_1_kafka
kafkaCh.sinks = sink_1_hdfs# Describe/configure the source
kafkaCh.sources.src_1_file.type = exec
kafkaCh.sources.src_1_file.command = tail -F /home/demo/flume-exec.txt# Define a kafka channel
kafkaCh.channels.ch_1_kafka.type = org.apache.flume.channel.kafka.KafkaChannel
kafkaCh.channels.ch_1_kafka.kafka.bootstrap.servers = slave1:9092,slave2:9092
kafkaCh.channels.ch_1_kafka.kafka.topic = kafka_channel
kafkaCh.channels.ch_1_kafka.kafka.consumer.group.id = flume-consumer# Describe the sink
kafkaCh.sinks.sink_1_hdfs.type = hdfs
kafkaCh.sinks.sink_1_hdfs.hdfs.path = /flume/kafka/channel
kafkaCh.sinks.sink_1_hdfs.hdfs.fileType = DataStream
kafkaCh.sinks.sink_1_hdfs.hdfs.filePrefix=sxt
kafkaCh.sinks.sink_1_hdfs.hdfs.rollCount=0
kafkaCh.sinks.sink_1_hdfs.hdfs.rollInterval=0# Bind the source and sink to the channel
kafkaCh.sources.src_1_file.channels = ch_1_kafka
kafkaCh.sinks.sink_1_hdfs.channel = ch_1_kafka

将上面两个Agent放在一个Agent中,用Kafka Channel实现。

注意:hdfs.path 必须存在,且有权限进行操作

相关文章:

  • 4. 数据结构: 对象和数组
  • 如何使用GLib的单向链表GSList
  • UE学习篇ContentExample解读------Blueprint_Communication-下
  • ELK-05-skywalking监控SpringCloud服务日志
  • Qt/C++如何选择使用哪一种地图内核/不同地图的优缺点/百度高德腾讯地图/天地图/谷歌地图
  • AI运用在营销领域的经典案例及解析
  • 单片机长短按简单实现
  • Unity 与虚幻引擎对比:两大游戏开发引擎的优劣分析
  • 宝塔搭建nextcould 30docker搭建onlyoffic8.0
  • blue-crab
  • VS Code 配置 Anaconda Python 环境
  • 观察者模式(发布-订阅模式)
  • RTMP、RTSP直播播放器的低延迟设计探讨
  • 搬砖6、Python函数和模块的使用
  • python股票因子,交易所服务器宕机,量化交易程序怎么应对
  • Apache Pulsar 2.1 重磅发布
  • Apache Zeppelin在Apache Trafodion上的可视化
  • in typeof instanceof ===这些运算符有什么作用
  • LeetCode刷题——29. Divide Two Integers(Part 1靠自己)
  • mongo索引构建
  • Python_OOP
  • select2 取值 遍历 设置默认值
  • Traffic-Sign Detection and Classification in the Wild 论文笔记
  • 关于 Linux 进程的 UID、EUID、GID 和 EGID
  • 回流、重绘及其优化
  • 聊聊directory traversal attack
  • 全栈开发——Linux
  • 深度学习中的信息论知识详解
  • 使用docker-compose进行多节点部署
  • 在electron中实现跨域请求,无需更改服务器端设置
  • 怎么把视频里的音乐提取出来
  • ionic入门之数据绑定显示-1
  • Java性能优化之JVM GC(垃圾回收机制)
  • 好程序员大数据教程Hadoop全分布安装(非HA)
  • ​LeetCode解法汇总1276. 不浪费原料的汉堡制作方案
  • ​TypeScript都不会用,也敢说会前端?
  • ​经​纬​恒​润​二​面​​三​七​互​娱​一​面​​元​象​二​面​
  • ​马来语翻译中文去哪比较好?
  • # linux 中使用 visudo 命令,怎么保存退出?
  • # Redis 入门到精通(七)-- redis 删除策略
  • ###C语言程序设计-----C语言学习(6)#
  • #NOIP 2014# day.1 生活大爆炸版 石头剪刀布
  • #宝哥教你#查看jquery绑定的事件函数
  • $L^p$ 调和函数恒为零
  • (2/2) 为了理解 UWP 的启动流程,我从零开始创建了一个 UWP 程序
  • (二)延时任务篇——通过redis的key监听,实现延迟任务实战
  • (附源码)python旅游推荐系统 毕业设计 250623
  • (回溯) LeetCode 40. 组合总和II
  • (译) 函数式 JS #1:简介
  • (中等) HDU 4370 0 or 1,建模+Dijkstra。
  • (转载)CentOS查看系统信息|CentOS查看命令
  • .equal()和==的区别 怎样判断字符串为空问题: Illegal invoke-super to void nio.file.AccessDeniedException
  • .NET / MSBuild 扩展编译时什么时候用 BeforeTargets / AfterTargets 什么时候用 DependsOnTargets?
  • .net 4.0发布后不能正常显示图片问题
  • .net core 3.0 linux,.NET Core 3.0 的新增功能