当前位置: 首页 > news >正文

Eclipse+kafka集群 实例源码

  1. 首先调试zookeeper与kafka正常配置,并达到集群功能,这个是前提条件。
  2. 建立Java工程,拷贝kafka的lib文件到工程目录下。
  3. Lib下有些asc文件,一定要去除,否则java编译错误
  4. 提示poll方法错误,最后找到原因为本机安装了jre7和jre8,默认加载jre7类包,改为jre8包后,系统调试通过。
  5. listeners必须要配置成Ip地址的形式 例如: listeners=PLAINTEXT://192.168.5.132:9092,我就是这个地方配置出错。
  6. Producer类如下:
package kafkamanager;

import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class producer {

    public producer() {
        // TODO Auto-generated constructor stub
    }

    public static void main(String[] args) throws InterruptedException {
        // TODO Auto-generated method stub
        
        
             Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.5.132:9092,192.168.5.133:9092,192.168.5.135:9092");
            
            //broker消息确认的模式,有三种:默认1
            //0:不进行消息接收确认,即Client端发送完成后不会等待Broker的确认
            //1:由Leader确认,Leader接收到消息后会立即返回确认信息
            //all:集群完整确认,Leader会等待所有in-sync的follower节点都确认收到消息后,再返回确认信息
            props.put("acks", "all");
            //发送失败时Producer端的重试次数,默认为0
            props.put("retries", 0);
            //当同时有大量消息要向同一个分区发送时,Producer端会将消息打包后进行批量发送。如果设置为0,则每条消息都独立发送。默认为16384字节
            props.put("batch.size", 16384);
            //发送消息前等待的毫秒数,与batch.size配合使用。在消息负载不高的情况下,
            //配置linger.ms能够让Producer在发送消息前等待一定时间,以积累更多的消息打包发送,达到节省网络资源的目的。默认为0
            props.put("linger.ms", 1);
            //消息缓冲池大小。尚未被发送的消息会保存在Producer的内存中,如果消息产生的速度大于消息发送的速度,
            //那么缓冲池满后发送消息的请求会被阻塞。默认33554432字节(32MB)
            props.put("buffer.memory", 33554432);
             //消息key/value的序列器Class,根据key和value的类型决定
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

            System.out.println("Start send !!!!!!");
            Producer<String, String> producer = new KafkaProducer<String, String>(props);
            for (int i = 0; i < 5; i++)
            {
                //producer.send(new ProducerRecord<String, String>("sss", "HelloTest! "+ Integer.toString(i)));
                producer.send(new ProducerRecord<String, String>("ooo", Integer.toString(i), Integer.toString(i)));
                
                java.lang.Thread.sleep(2);
            }

            producer.close();
            System.out.println("Start send end !!!!!!");
            System.exit(0);
    }
}
View Code

 

  7.consumer类如下

package kafkamanager;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class consumer implements Runnable {

    private final KafkaConsumer<String, String> consumer;
     private final String topic;
    private static final String GROUPID = "groupA";

    public consumer(String topicName) {
        Properties props = new Properties();
        
        //********** 一定要注意:server.properties 的配置: listeners=PLAINTEXT://192.168.5.132:9092,这个地方配置成机器名接收不到消息。
        props.put("bootstrap.servers", "192.168.5.132:9092");
        
        //Consumer的group id,同一个group下的多个Consumer不会拉取到重复的消息,
        //不同group下的Consumer则会保证拉取到每一条消息。注意,同一个group下的consumer数量不能超过分区数。
        //必须要使用别的组名称, 如果生产者和消费者都在同一组,则不能访问同一组内的topic数据
        props.put("group.id", GROUPID);
        
        //是否自动提交已拉取消息的offset。提交offset即视为该消息已经成功被消费,该组下的Consumer无法再拉取到该消息(除非手动修改offset)。默认为true
        props.put("enable.auto.commit", "false");
        
        //自动提交offset的间隔毫秒数,默认5000。
        //本 例中采用的是自动提交offset,Kafka client会启动一个线程定期将offset提交至broker。假设在自动提交的间隔内发生故障(比如整个JVM进程死掉),那么有一部分消息是会被 重复消费的。要避免这一问题,可使用手动提交offset的方式。构造consumer时将enable.auto.commit设为false,并在代 码中用consumer.commitSync()来手动提交。
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        
        ////此配置参数表示当此groupId下的消费者,在ZK中没有offset值时(比如新的groupId,或者是zk数据被清空),
        //consumer应该从哪个offset开始消费.latest表示接受接收最大的offset(即最新消息),
        //earliest表示最小offset,即从topic的开始位置消费所有消息.最好设为earliest,这样新的分组,能从最开始进行处理
        //原由是Kafka新的消费者,默认情况下会从最后一条消费进行消费,就是开始消费的时候,会从新增加的消息开始处理,即从我开始添加的1000条以后,才会开始处理。
        //所以必须要设置auto.offset.reset设置新加入的消费者,从头条开始处理消费。当然有些情况,可能需要从最新的开始处理。
        //props.put("auto.offset.reset", "earliest");
        props.put("auto.offset.reset", "latest");
        //props.put("auto.offset.reset", "none");
        
        //props.put("max.poll.records", 2);//设置最新拉取数据条数
        
        //props.put("key.deserializer", StringDeserializer.class.getName());
        //props.put("value.deserializer", StringDeserializer.class.getName());
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        int messageNo = 1;
        System.out.println("---------开始消费---------");
        
        while (true) {

            //poll方法即是从Broker拉取消息,在poll之前首先要用subscribe方法订阅一个Topic
            //如 果Topic有多个partition,KafkaConsumer会在多个partition间以轮询方式实现负载均衡
            //。如果启动了多个 Consumer线程,Kafka也能够通过zookeeper实现多个Consumer间的调度,保证同一组下的Consumer不会重复消费消息。
            //注 意,Consumer数量不能超过partition数,超出部分的Consumer无法拉取到任何数据。
            ConsumerRecords<String, String> records = consumer.poll(100);//拉取超时毫秒数,如果没有新的消息可供拉取,consumer会等待指定的毫秒数,到达超时时间后会直接返回一个空的结果集
            for (ConsumerRecord<String, String> record : records) {

                System.out.println("fetched from partition " + record.partition() + ", offset: " + record.offset() + ",key:" + record.key() + ", message: " + record.value());
            }

            //提交已经拉取出来的offset,如果是手动模式下面,必须拉取之后提交,否则以后会拉取重复消息
            consumer.commitSync();

            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //break;

        }
        
    }  
    public static void main(String args[]) {
        consumer test1 = new consumer("ooo");
        Thread thread1 = new Thread(test1);
        thread1.start();
    }
}
View Code

  8.该实例已调通,希望对初学者有帮助。

 

  

转载于:https://www.cnblogs.com/jianzi/p/10026265.html

相关文章:

  • 高并发处理思路与手段(六):服务降级与服务熔断
  • 2014第6周日杂记
  • 26. Remove Duplicates from Sorted Array C++ 删除排序数组中的重复项
  • Usage of API documented as @since 1.8+”报错的解决办法
  • Mysql容器启动失败-解决方案
  • Linux的epoll模型
  • java B2B2C电子商务平台分析之七-Spring Cloud Config
  • 安排
  • Effective_STL 学习笔记(三十六) 了解 copy_if 的正确 实现
  • 强健x86平台
  • Dubbo各种协议详解
  • Java,console输出实时的转向GUI textbox
  • SpringBoot 日志框架
  • Expression.Bind()方法的应用
  • python基础----特性(property)、静态方法(staticmethod)、类方法(classmethod)、__str__的用法...
  • 78. Subsets
  • Android 控件背景颜色处理
  • Intervention/image 图片处理扩展包的安装和使用
  • JavaScript 一些 DOM 的知识点
  • Linux中的硬链接与软链接
  • Lucene解析 - 基本概念
  • MD5加密原理解析及OC版原理实现
  • MySQL主从复制读写分离及奇怪的问题
  • Odoo domain写法及运用
  • pdf文件如何在线转换为jpg图片
  • Python_网络编程
  • webpack4 一点通
  • XML已死 ?
  • 关于List、List?、ListObject的区别
  • 每个JavaScript开发人员应阅读的书【1】 - JavaScript: The Good Parts
  • 面试遇到的一些题
  • 400多位云计算专家和开发者,加入了同一个组织 ...
  • gunicorn工作原理
  • Spring第一个helloWorld
  • Unity3D - 异步加载游戏场景与异步加载游戏资源进度条 ...
  • 京东物流联手山西图灵打造智能供应链,让阅读更有趣 ...
  • ​Base64转换成图片,android studio build乱码,找不到okio.ByteString接腾讯人脸识别
  • # 再次尝试 连接失败_无线WiFi无法连接到网络怎么办【解决方法】
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (Redis使用系列) SpringBoot中Redis的RedisConfig 二
  • (附源码)ssm捐赠救助系统 毕业设计 060945
  • (附源码)ssm学生管理系统 毕业设计 141543
  • (过滤器)Filter和(监听器)listener
  • (排序详解之 堆排序)
  • (十二)python网络爬虫(理论+实战)——实战:使用BeautfulSoup解析baidu热搜新闻数据
  • (一)UDP基本编程步骤
  • (转)visual stdio 书签功能介绍
  • (转)负载均衡,回话保持,cookie
  • (转)一些感悟
  • .net core 连接数据库,通过数据库生成Modell
  • .Net MVC4 上传大文件,并保存表单
  • .NET Standard、.NET Framework 、.NET Core三者的关系与区别?
  • .NET 读取 JSON格式的数据
  • .net 验证控件和javaScript的冲突问题
  • .net 桌面开发 运行一阵子就自动关闭_聊城旋转门家用价格大约是多少,全自动旋转门,期待合作...