Flume的安装与使用 -- flume自定义拦截器 -- tailDir + Memory + HDFS案例
文章目录
- 前言
- 一、安装
- 1、上传、解压、重命名
- 2、修改环境变量
- 3、修改配置文件
- 二、案例(tailDir + Memory + HDFS)
- 三、flume中的自定义拦截器
- 1、导包
- 2、导入打包插件
- 3、代码
前言
Flume – 一般用于采集日志数据
Sqoop – 一般用于采集数据库中的数据
一、安装
1、上传、解压、重命名
flume下载地址
提取码: 1234
cd /opt/modules
# 上传
# 解压
tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/installs
# 重命名
mv /opt/modules/apache-flume-1.9.0-bin /opt/modules/flume
2、修改环境变量
vi /etc/profireexport FLUME_HOME=/opt/installs/flume
export PATH=$PATH:$FLUME_HOME/bin
3、修改配置文件
cd /opt/installs/flume/conf
mv flume-env.sh.template flume-env.sh#添加如下配置:
export JAVA_HOME=/opt/installs/jdk
二、案例(tailDir + Memory + HDFS)
Source(数据来源) + channels(通道) + sinks(数据下沉的地方) 组成了flume的agent
tailDir 是用来监控多个文件夹下的多个文件的,只要文件内容发生变化,就会再次的进行数据的抽取
在/opt/installs/flume/conf下创建一个myconf文件夹
并在myconf下创建一个taildir-memory-hdfs.conf 文件
在 taildir-memory-hdfs.conf 文件添加如下内容
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.filegroups = f1
# . 代表的意思是一个任意字符 * 代表前面的字符出现0到多次
a1.sources.r1.filegroups.f1 = /home/scripts/datas/.*txt.*a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume3/logs
执行
cd /opt/installs/flume/conf/myconf/flume-ng agent -n a1 -c ./ -f taildir-memory-hdfs.conf -Dflume.root.logger=INFO,console
三、flume中的自定义拦截器
1、导包
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
2、导入打包插件
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>3.1.1</version><configuration><!-- 禁止生成 dependency-reduced-pom.xml--><createDependencyReducedPom>false</createDependencyReducedPom></configuration><executions><!-- Run shade goal on package phase --><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><relocations><relocation><!-- 解决包冲突 进行转换--><pattern>com.google.protobuf</pattern><shadedPattern>shaded.com.google.protobuf</shadedPattern></relocation></relocations><artifactSet><excludes><exclude>log4j:*</exclude></excludes></artifactSet><filters><filter><!-- Do not copy the signatures in the META-INF folder.Otherwise, this might cause SecurityExceptions when using the JAR. --><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude></excludes></filter></filters><transformers><!-- 某些jar包含具有相同文件名的其他资源(例如属性文件)。 为避免覆盖,您可以选择通过将它们的内容附加到一个文件中来合并它们--><transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer"><resource>reference.conf</resource></transformer><transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"><mainClass>mainclass</mainClass></transformer></transformers></configuration></execution></executions></plugin></plugins></build>
3、代码
//实现Interceptor接口,并实现里面的四个方法
public class DemoInterceptor implements Interceptor{// 具体的逻辑代码,根据需求自行编写
}
// 在内部定义一个类,实现Builder接口public static class BuilderEvent implements Builder{@Overridepublic Interceptor build() {return new DemoInterceptor();}@Overridepublic void configure(Context context) {}}