当前位置: 首页 > news >正文

FLUME 安装配置及使用示例

下载安装

官网:http://flume.apache.org/

下载 apache-flume-1.9.0-bin.tar.gz

下载

$ tar xzvf apache-flume-1.9.0-bin.tar.gz -C /home/hadoop/local

$ cd /home/hadoop/local

$ ln -s apache-flume-1.9.0-bin flume

删除冲突的 Jar 包

$ cd /home/hadoop/local/flume/lib

$ rm -rf guava-11.0.2.jar

配置环境变量

$ vim /etc/profile.d/my_env.sh

FLUME_HOME=/home/hadoop/local/flume                                                                                                                          
PATH=$PATH:$FLUME_HOME/bin                              
export FLUME_HOME PATH

$ source /etc/proflie

Flume 组件选型

1)Source

(1)Taildir Source 相比 Exec Source、Spooling Directory Source 的优势

TailDir Source:断点续传、多目录。Flume1.6 以前需要自己自定义 Source 记录每次读取文件位置,实现断点续传。

Exec Source:可以实时搜集数据,但是在 Flume 不运行或者 Shell 命令出错的情况下,数据将会丢失;

Spooling Directory Source:监控目录,支持断点续传;

(2)batchSize 大小如何设置?

Event 1K 左右时,500 - 1000 合适(默认为 100)

(3)KafkaSource

从 Kafka 的 topic 中读取数据;

2)Channel

采用 Kafka Channel,省去了 Sink,提高了效率。KafkaChannel 数据存储在 Kafka 里面,所以数据时存储在磁盘中;

Kafka Channel

用 Kafka 作为一个数据缓冲,使用 Kafka Channel 数据已经写入到 Kafka 中了,就不需要再接 Kafka Sink 了;

可使用的场景:

  • 使用 Flume 的 Source 和 Sink - 它为 events 提供可靠且高可用的通道;
  • 使用 Flume 的 Source 和拦截器,但不用 Sink - 它允许将 Flume 的 events 写入 Kafka 的 topic,以供其他应用程序使用;
  • 使用 Flume 的 Sink,但不用 Source - 它是一种低延迟、容错的方式,可将 events 从 Kafka 发送到 Flume 的 Sink,如 HDFS、HBASE 或 Solr;

3)Sink

Kafka Sink

将消息发布到 Kafka 的 topic 中,相当于 Kafka 的生产者角色;

实例

1)Kafka Source

场景

通过 Kafka Source 从 Kafka 中获取数据,通过 Memory Channel,通过 Logger Sink 输出到日志文件

Kafka Source -- Memory Channel -- Logger Sink

配置

配置文件中的配置都参考官网文档 https://flume.apache.org/releases/content/1.10.1/FlumeUserGuide.html

$ mkdir /home/hadoop/local/flume/jobs

$ vim /home/hadoop/local/flume/jobs/kafkasource.conf

a1.sources = r1                                                                                                                                              
a1.channels = c1                                                                                                                                             
a1.sinks = k1                                                                                                                                                
                                                                                                                                                             
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource                                                                                               
a1.sources.r1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092                                                                                           
a1.sources.r1.kafka.topics = zsoft1                                                                                                                          
a1.sources.r1.kafka.consumer.group.id = flume                                                                                                                
a1.sources.r1.useFlumeEventFormat = false                                                                                                                    
                                                                                                                                                             
a1.channels.c1.type = memory                                                                                                                                 
a1.channels.c1.capacity = 10000                                                                                                                              
a1.channels.c1.transactionCapacity = 1000                                                                                                               
                                                                                                                                                             
a1.sinks.k1.type = logger                                                                                                                                    
                                                                                                                                                             
a1.sources.r1.channels = c1                                                                                                                                  
a1.sinks.k1.channel = c1   

运行 Flume

$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasource.conf -n a1 -Dflume.root.logger=INFO,console

测试

再在另外一个 shell 中启动一个生产者:

$ kafka-console-producer.sh --topic zsoft1 --broker-list ns1:9092

输入:

> hello

> zhangsan

> lisi

在启动 flume-ng 的窗口中看到打印出了相关信息:

2022-09-01 17:02:59,624 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
s:{topic=zsoft1, partition=1, offset=4, timestamp=1662022979348} body: 68 65 6C 6C 6F                                  hello }                               
2022-09-01 17:03:09,631 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
s:{topic=zsoft1, partition=0, offset=6, timestamp=1662022988636} body: 7A 68 61 6E 67 73 61 6E                         zhangsan }                            
2022-09-01 17:03:09,632 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header
s:{topic=zsoft1, partition=1, offset=5, timestamp=1662022988636} body: 6C 69 73 69                                     lisi } 

2)Kafka Sink

场景

监控端口数据输入,通过 Memory Channel,通过 Kafka Sink 输出到 Kafka

netcat Source -- Memory Channel -- Kafka Sink

配置

$ vim /home/hadoop/local/flume/jobs/kafkasink.conf

a1.sources = r1                                                                                                                                              
a1.channels = c1                                                                                                                                             
a1.sinks = k1                                                                                                                                                
                                                                                                                                                             
a1.sources.r1.type = netcat                                                                                               
a1.sources.r1.bind = 0.0.0.0                                                                                        
a1.sources.r1.port = 6666                                                                                                                
                                                                                                                                                             
a1.channels.c1.type = memory                                                                                                                                 
a1.channels.c1.capacity = 10000                                                                                                                              
a1.channels.c1.transactionCapacity = 1000                                                                                                               
                                                                                                                                                             
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
a1.sinks.k1.kafka.topic = zsoft1
a1.sinks.k1.kafka.producer.acks = -1
a1.sinks.k1.useFlumeEventFormat = false    # true:保留 header,会将 header 信息也存入 kafka 中
                                                                                                                                                             
a1.sources.r1.channels = c1                                                                                                                                  
a1.sinks.k1.channel = c1   

运行 FLUME

$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink.conf -n a1 -Dflume.root.logger=INFO,console

测试

在另一个 shell 中启动一个 Kafka 的消费者:

$ kafka-console-consumer.sh --topic zsoft1 --bootstrap-server ns1:9092

在另一个 shell 中启动一个 nc(在 ns1 服务器上):

$ nc localhost 6666

在其中输入:

hello

zhangsan

lisi

在 Kafka 消费者终端中会打印出输入的内容:

hello                                                                                                                                                        
zhangsan                                                                                                                                                     
lisi 

3)复杂 Kafka Sink(将数据发往多 topic)

场景

从 netcat source 获取数据,通过拦截器,通过 multiplexing channel selector 选择器,不同的内容推送到不同的 memory channel 通道中,并进入不同的 Kafka Sink,写入到对应的 Kafka 主题中;

上面这个场景可通过 Kafka Sink 进行简化:

Kafka Sink 可通过判断 event header 中的 topic 字段值放入对应的 topic 中;这样就不用在进入 channel 时候就分到不同的 channel 处理了;拦截器还需要有,在拦截器中要加 header 信息,选择器也可以使用简单的 replicating channel selector;

netcat Source -- 拦截器 -- replicating channel selector -- memory Channel -- Kafka Sink

拦截器工程

用 IDEA 创建一个 Maven 工程 flume-interceptor

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.zsoft</groupId>
    <artifactId>flume-interceptor</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.62</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

新建拦截器:

com.zsoft.flume.interceptor/EventHeaderInterceptor.java

package com.zsoft.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;

public class EventHeaderInterceptor implements Interceptor {
    public void initialize() {

    }

    /*
     * 拦截方法
     */
    public Event intercept(Event event) {
        //1.获取event的headers
        Map<String, String> headers = event.getHeaders();
        //2.获取event的body
        byte[] body1 = event.getBody();
        String body = new String(body1, StandardCharsets.UTF_8);
        //3.判断是否包含"zhangsan" "lisi"
        if (body.contains("zhangsan")){
            headers.put("topic","zhangsan");
        }else if(body.contains("lisi")){
            headers.put("topic","lisi");
        }
        return event;
    }

    public List<Event> intercept(List<Event> events) {
        for (Event event : events) {
            intercept(event);
        }
        return events;
    }

    public void close() {

    }

    public static class MyBuilder implements Builder{

        @Override
        public Interceptor build() {
            return new EventHeaderInterceptor();
        }

        @Override
        public void configure(Context context) {

        }
    }
}

对项目打包:

Maven:clean package

将打包后项目 target 目录下的 flume-interceptor-1.0-SNAPSHOT.jar 拷贝到 ns1 服务器 /home/hadoop/local/flume/lib/ 下

配置

$ vim /home/hadoop/local/flume/jobs/kafkasink-topics.conf

a1.sources = r1                                                                                                                                              
a1.channels = c1                                                                                                                                             
a1.sinks = k1                                                                                                                                                
                                                                                                                                                             
a1.sources.r1.type = netcat                                                                                               
a1.sources.r1.bind = 0.0.0.0                                                                                        
a1.sources.r1.port = 6666

a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.zsoft.flume.interceptor.EventHeaderInterceptor$MyBuilder

a1.channels.c1.type = memory                                                                                                                                 
a1.channels.c1.capacity = 10000                                                                                                                              
a1.channels.c1.transactionCapacity = 1000                                                                                                               

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
a1.sinks.k1.kafka.topic = other
a1.sinks.k1.kafka.producer.acks = -1
a1.sinks.k1.useFlumeEventFormat = false    # true:保留 header,会将 header 信息也存入 kafka 中

a1.sources.r1.channels = c1                                                                                                                                  
a1.sinks.k1.channel = c1   

启动 Flume

$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkasink-topics.conf -n a1 -Dflume.root.logger=INFO,console

测试

新开 3 个 shell 窗口,分别消费 Kafka 的 3 个 topic:zhangsan、lisi、other:

窗口1:

$ kafka-console-consumer.sh --topic zhangsan --bootstrap-server ns1:9092

窗口2:

$ kafka-console-consumer.sh --topic lisi --bootstrap-server ns1:9092

窗口3:

$ kafka-console-consumer.sh --topic other --bootstrap-server ns1:9092

再在新的 shell 窗口用 nc 打开端口:

$ nc ns1 6666

输入 hello,在窗口 3 监控 other topic 的输出中打印出 hello

输入 zhangsan,在窗口 1 监控 zhangsan topic 的输出中打印出 zhangsan

输入 lisi,在窗口 2 监控 lisi topic 的输出中打印出 lisi

4)Kafka Channel

场景

netcat Source -- Kafka Channel -- Logger Sink

这种既有 Source 又有 Sink 的 Kafka Channel 使用的比较少

配置

$ vim /home/hadoop/local/flume/jobs/kafkachannel.conf

a1.sources = r1
a1.channels = c1
a1.sinks = k1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
a1.channels.c1.kafka.topic = zsoft1
a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = logger

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动 Flume

$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannel.conf -n a1 -Dflume.root.logger=INFO,console

测试

在另外 shell 启动 nc:

$ nc ns1 6666

输入:

aaaaaa

bbbbbb

在 Flume 的进程中打印出:

aaaaaa

bbbbbb

5)Kafka Channel no Source

场景

没有 Source,Kafka Channel 直接从 Kafka 获取数据

Kafka Channel -- Logger Sink

配置

$ vim /home/hadoop/local/flume/jobs/kafkachannelnosource.conf

a1.channels = c1
a1.sinks = k1

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
a1.channels.c1.kafka.topic = zsoft1
a1.channels.c1.parseAsFlumeEvent = false

a1.sinks.k1.type = logger

a1.sinks.k1.channel = c1

启动 Flume

$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannelnosource.conf -n a1 -Dflume.root.logger=INFO,console

测试

在另一个 shell 中启动一个 Kafka 生产者:

$ kafka-console-producer.sh --topic zsoft1 --broker-list ns1:9092

> hello

在 Flume 终端打印出内容:

2022-09-02 14:25:20,222 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { header s:{} body: 68 65 6C 6C 6F hello }

6)Kafka Channel no Sink

场景

把数据直接写道 Kafka

netcat Source -- Kafka Channel

配置

$ /home/hadoop/local/flume/jobs/kafkachannelnosink.conf

a1.sources = r1
a1.channels = c1

a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666

a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = ns1:9092,ns2:9092,ns3:9092
a1.channels.c1.kafka.topic = zsoft1
a1.channels.c1.parseAsFlumeEvent = false

a1.sources.r1.channels = c1

启动 Flume

$ flume-ng agent -c $FLUME_HOME/conf -f $FLUME_HOME/jobs/kafkachannelnosink.conf -n a1 -Dflume.root.logger=INFO,console

测试

在另一个 shell 窗口中启动一个 Kafka 消费者

$ kafka-console-consumer.sh --topic zosft1 --bootstrap-server ns1:9092

在另一个 shell 窗口中打开 nc

$ nc ns1 6666

输入:

hello

在 Kafka 消费者 console 中打印出

hello

相关文章:

  • 高等教育心理学:学生的个性与社会性的发展
  • Web前端:Angular的优缺点以及何时使用Angular?
  • Redis基础与高可用集群架构进阶详解
  • 如何翻译图片上的英文?建议收藏这三个方法
  • 微信网页开发——JS-SDK接入以及微信二次分享图标和标题丢失
  • 外汇监管机构哪个好?怎么选择?
  • 基于Springboot+vue的电影票预定管理系统 elementui
  • 标签类目体系(面向业务的数据资产设计方法论)-读书笔记5
  • Python统计学01——数据可视化
  • BUUCTF:8月做题记录
  • iNFTnews | Web3正在推动一个41万亿元的市场?
  • Linux(Ubuntu)用户与用户组(入门必看)
  • 实测 ubuntu 20.04 使用 lidar_imu_calib 功能包 进行 激光雷达与imu标定
  • OSI与TCP/IP与的体系结构的比较
  • 多线程之ThreadPoolExecutor
  • 2017-09-12 前端日报
  • node和express搭建代理服务器(源码)
  • react 代码优化(一) ——事件处理
  • React 快速上手 - 07 前端路由 react-router
  • SpringBoot 实战 (三) | 配置文件详解
  • 聊聊directory traversal attack
  • 聊聊flink的TableFactory
  • 驱动程序原理
  • 设计模式 开闭原则
  • 使用API自动生成工具优化前端工作流
  • 原生js练习题---第五课
  • 【云吞铺子】性能抖动剖析(二)
  • ​如何防止网络攻击?
  • ​如何在iOS手机上查看应用日志
  • #考研#计算机文化知识1(局域网及网络互联)
  • $refs 、$nextTic、动态组件、name的使用
  • (06)金属布线——为半导体注入生命的连接
  • (C++20) consteval立即函数
  • (pojstep1.1.1)poj 1298(直叙式模拟)
  • (二)基于wpr_simulation 的Ros机器人运动控制,gazebo仿真
  • (附源码)ssm教师工作量核算统计系统 毕业设计 162307
  • (更新)A股上市公司华证ESG评级得分稳健性校验ESG得分年均值中位数(2009-2023年.12)
  • (接口自动化)Python3操作MySQL数据库
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (一)使用Mybatis实现在student数据库中插入一个学生信息
  • (转载)(官方)UE4--图像编程----着色器开发
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • **登录+JWT+异常处理+拦截器+ThreadLocal-开发思想与代码实现**
  • .cn根服务器被攻击之后
  • .NET 4 并行(多核)“.NET研究”编程系列之二 从Task开始
  • .NET Core中Emit的使用
  • .net Signalr 使用笔记
  • .net 发送邮件
  • .NET3.5下用Lambda简化跨线程访问窗体控件,避免繁复的delegate,Invoke(转)
  • .net和jar包windows服务部署
  • ::before和::after 常见的用法
  • @Builder用法
  • @DateTimeFormat 和 @JsonFormat 注解详解
  • [2021ICPC济南 L] Strange Series (Bell 数 多项式exp)
  • [acm算法学习] 后缀数组SA