当前位置: 首页 > news >正文

flume 读取kafka 数据

本文介绍flume读取kafka数据的方法

代码:


/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *  
 * http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *******************************************************************************/
package org.apache.flume.source.kafka;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.Message;

import kafka.message.MessageAndMetadata;
import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.conf.ConfigurationException;
import org.apache.flume.event.SimpleEvent;
import org.apache.flume.source.AbstractSource;
import org.apache.flume.source.SyslogParser;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
 * A Source for Kafka which reads messages from kafka. I use this in company production environment
 * and its performance is good. Over 100k messages per second can be read from kafka in one source.<p>
 * <tt>zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
 * <tt>topic: </tt> the topic to read from kafka.<p>
 * <tt>group.id: </tt> the groupid of consumer group.<p>
 */
public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
    private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
    private ConsumerConnector consumer;
    private ConsumerIterator<byte[], byte[]> it;
    private String topic;
    
    public Status process() throws EventDeliveryException {
        List<Event> eventList = new ArrayList<Event>();
        MessageAndMetadata<byte[],byte[]> message;
        Event event;
        Map<String, String> headers;
        String strMessage;
        try {
            if(it.hasNext()) {
                message = it.next();
                event = new SimpleEvent();
                headers = new HashMap<String, String>();
                headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

                strMessage =  String.valueOf(System.currentTimeMillis()) + "|" + new String(message.message());
                log.debug("Message: {}", strMessage);

                event.setBody(strMessage.getBytes());
                //event.setBody(message.message());
                event.setHeaders(headers);
                eventList.add(event);
            }
            getChannelProcessor().processEventBatch(eventList);
            return Status.READY;
        } catch (Exception e) {
            log.error("KafkaSource EXCEPTION, {}", e.getMessage());
            return Status.BACKOFF;
        }
    }

    public void configure(Context context) {
        topic = context.getString("topic");
        if(topic == null) {
            throw new ConfigurationException("Kafka topic must be specified.");
        }
        try {
            this.consumer = KafkaSourceUtil.getConsumer(context);
        } catch (IOException e) {
            log.error("IOException occur, {}", e.getMessage());
        } catch (InterruptedException e) {
            log.error("InterruptedException occur, {}", e.getMessage());
        }
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(1));
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        if(consumerMap == null) {
            throw new ConfigurationException("topicCountMap is null");
        }
        List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
        if(topicList == null || topicList.isEmpty()) {
            throw new ConfigurationException("topicList is null or empty");
        }
        KafkaStream<byte[], byte[]> stream =  topicList.get(0);
        it = stream.iterator();
    }

    @Override
    public synchronized void stop() {
        consumer.shutdown();
        super.stop();
    }

}

/*******************************************************************************
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *  
 * http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *******************************************************************************/
package org.apache.flume.source.kafka;


import java.io.IOException;
import java.util.Map;
import java.util.Properties;

import com.google.common.collect.ImmutableMap;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.javaapi.consumer.ConsumerConnector;

import org.apache.flume.Context;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class KafkaSourceUtil {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);

    public static Properties getKafkaConfigProperties(Context context) {
        log.info("context={}",context.toString());
        Properties props = new Properties();
        ImmutableMap<String, String> contextMap = context.getParameters();
        for (Map.Entry<String,String> entry : contextMap.entrySet()) {
            String key = entry.getKey();
            if (!key.equals("type") && !key.equals("channel")) {
                props.setProperty(entry.getKey(), entry.getValue());
                log.info("key={},value={}", entry.getKey(), entry.getValue());
            }
        }
        return props;
    }
    public static ConsumerConnector getConsumer(Context context) throws IOException, InterruptedException {
        ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));
        ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
        return consumer;
    }
}



配置文件:( /etc/flume/conf/flume-kafka-file.properties)

agent_log.sources = kafka0
agent_log.channels = ch0
agent_log.sinks = sink0

agent_log.sources.kafka0.channels = ch0
agent_log.sinks.sink0.channel = ch0



agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource
agent_log.sources.kafka0.zookeeper.connect = node3:2181,node4:2181,node5:2181
agent_log.sources.kafka0.topic = kkt-test-topic
agent_log.sources.kafka0.group.id= test

agent_log.channels.ch0.type = memory
agent_log.channels.ch0.capacity = 2048
agent_log.channels.ch0.transactionCapacity = 1000


agent_log.sinks.sink0.type=file_roll
agent_log.sinks.sink0.sink.directory=/data/flumeng/data/test
agent_log.sinks.sink0.sink.rollInterval=300

启动脚本:

sudo su  -l -s /bin/bash  flume  -c '/usr/lib/flume/bin/flume-ng agent --conf /etc/flume/conf --conf-file /etc/flume/conf/flume-kafka-file.properties -name agent_log -Dflume.root.logger=INFO,console '


注意: 红色字体的功能是对原来数据增加时间戳

            版本号 flume-1.4.0.2.1.1.0 + kafka2.8.0-0.8.0

            參考资料:https://github.com/baniuyao/flume-kafka

             编译用到的库:

            flume-ng-configuration-1.4.0.2.1.1.0-385

            flume-ng-core-1.4.0.2.1.1.0-385

            flume-ng-sdk-1.4.0.2.1.1.0-385

            flume-tools-1.4.0.2.1.1.0-385

            guava-11.0.2

            kafka_2.8.0-0.8.0

            log4j-1.2.15

            scala-compiler

            scala-library

            slf4j-api-1.6.1

            slf4j-log4j12-1.6.1

            zkclient-0.3

            zookeeper-3.3.4

                





转载于:https://www.cnblogs.com/ldxsuanfa/p/10888518.html

相关文章:

  • 第5周(上)
  • 吴裕雄 Bootstrap 前端框架开发——Bootstrap 图片:将图片变为圆形 (IE8 不支持)
  • Laravel 某个字段更新失败的原因
  • 在centos7下从零搭建Nginx+uWSGI+Django
  • 【面向对象】三单元JML总结
  • SSM三大框架的运行流程、原理、核心技术详解
  • 剑指offer——约瑟夫环
  • 爬虫的浏览器伪装技术
  • ChannelPipeline
  • 你需要的物流运输类报表,都在这里
  • 本地Navicat远程连接Centos7服务器出现的错误汇总
  • 转载一篇让你全面了解什么是.NET。
  • Java设计模式: 单例模式
  • webpack4.0介绍与使用(一)
  • Java 8中处理集合的优雅姿势——Stream
  • 【108天】Java——《Head First Java》笔记(第1-4章)
  • Android Volley源码解析
  • Angular 2 DI - IoC DI - 1
  • ES10 特性的完整指南
  • ES6语法详解(一)
  • FastReport在线报表设计器工作原理
  • session共享问题解决方案
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • 每个JavaScript开发人员应阅读的书【1】 - JavaScript: The Good Parts
  • 日剧·日综资源集合(建议收藏)
  • 微信小程序填坑清单
  • 栈实现走出迷宫(C++)
  • 自制字幕遮挡器
  • ​中南建设2022年半年报“韧”字当头,经营性现金流持续为正​
  • #DBA杂记1
  • #HarmonyOS:软件安装window和mac预览Hello World
  • (10)Linux冯诺依曼结构操作系统的再次理解
  • (2)nginx 安装、启停
  • (4) PIVOT 和 UPIVOT 的使用
  • (C#)一个最简单的链表类
  • (ibm)Java 语言的 XPath API
  • (react踩过的坑)Antd Select(设置了labelInValue)在FormItem中initialValue的问题
  • (Redis使用系列) SpirngBoot中关于Redis的值的各种方式的存储与取出 三
  • (待修改)PyG安装步骤
  • (二)换源+apt-get基础配置+搜狗拼音
  • (二十四)Flask之flask-session组件
  • (翻译)Quartz官方教程——第一课:Quartz入门
  • (机器学习-深度学习快速入门)第三章机器学习-第二节:机器学习模型之线性回归
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (亲测有效)解决windows11无法使用1500000波特率的问题
  • (一)WLAN定义和基本架构转
  • .net core 控制台应用程序读取配置文件app.config
  • .net 使用ajax控件后如何调用前端脚本
  • .Net8 Blazor 尝鲜
  • .NET多线程执行函数
  • .NET序列化 serializable,反序列化
  • .pyc文件还原.py文件_Python什么情况下会生成pyc文件?
  • @EnableAsync和@Async开始异步任务支持
  • @JsonSerialize注解的使用
  • @modelattribute注解用postman测试怎么传参_接口测试之问题挖掘