当前位置: 首页 > news >正文

kafka-java客户端连接

使用java客户端, kafkaproducer, kafkaconsumer进行kafka的连接

注: 0.10 版本之后, 连接kafka只需要brokerip即可, 不需要zookeeper的信息

1, kafka 配置信息

{
    "producer": {
        "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093",
        "key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
        "value.serializer": "org.apache.kafka.common.serialization.StringSerializer",
        "max.request.size": "10485760",
        "batch.size": "163840",
        "buffer.memory": "536870912",
        "max.block.ms": "500",
        "retries": "3",
        "acks": "1",
    },
    "cosumer": {
        "bootstrap.servers": "10.183.93.127:9093,10.183.93.128:9093,10.183.93.130:9093",
        "group.id": "test222",
        "session.timeout.ms": "30000",
        "key.deserializer": "org.apache.kafka.common.serialization.StringDeserializer",
        "value.deserializer": "org.apache.kafka.common.serialization.StringDeserializer"
    }
}

 

2, kafka utils, 用来读取kafka的配置信息

package com.wenbronk.kafka;


import com.alibaba.fastjson.JSON;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.io.FileReader;
import java.util.Map;
import java.util.Properties;

public class KafkaUtils {
    @Test
    public void test() throws FileNotFoundException {
        getConfig("producer");
//        fastJSON();
    }

    public static JsonObject getConfig(String name) throws FileNotFoundException {
        JsonParser parser = new JsonParser();
        JsonElement parse = parser.parse(new FileReader("src/main/resources/kafka"));
        JsonObject jsonObject = parse.getAsJsonObject().getAsJsonObject(name);
        System.out.println(jsonObject);
        return jsonObject;
    }

    public static Properties getProperties(String sourceName) throws FileNotFoundException {
        JsonObject config = KafkaUtils.getConfig(sourceName);
        Properties properties = new Properties();

        for (Map.Entry<String, JsonElement> entry : config.entrySet()) {
            properties.put(entry.getKey(), entry.getValue().getAsString());
        }
        return properties;
    }

//    public static void fastJSON() throws FileNotFoundException {
//        Object o = JSON.toJSON(new FileReader("src/main/resources/kafka"));
//        System.out.println(o);
//    }

}

3, kafka producer

package com.wenbronk.kafka;

import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.junit.Test;

import javax.swing.text.StyledEditorKit;
import java.io.FileNotFoundException;
import java.util.*;
import java.util.stream.IntStream;

/**
 * 消息提供者
 */
public class KafkaProducerMain {

    @Test
    public void send() throws Exception {
        HashMap<String, String> map = new HashMap<>();
        map.put("http_zhixin", "send message to kafka from producer");
        for (int i = 0; i < 3; i++ ) {
            sendMessage(map);
        }
//        sendMessage(map);
    }

    /**
     * 消息发送
     */
    public void sendMessage(Map<String, String> topicMsg) throws FileNotFoundException {
        Properties properties = KafkaUtils.getProperties("producer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        for (Map.Entry<String, String> entry : topicMsg.entrySet()) {
            String topic = entry.getKey();
            String message = entry.getValue();
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic, message);
            // 发送
//            producer.send(record, new CallBackFuntion(topic, message));
            producer.send(record, (recordMetadata, e) -> {
                if (e != null) {
                    System.err.println(topic + ": " + message + "--消息发送失败");
                }else {
                    System.err.println(topic + ": " + message + "--消息发送成功");
                }
            });
        }
        producer.flush();
        producer.close();
    }
}

回掉函数可写匿名内部类, 也可写外部类通过新建的方式运行

package com.wenbronk.kafka;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;

/**
 * 回掉函数
 */
public class CallBackFuntion implements Callback {

    private String topic;
    private String message;

    public CallBackFuntion(String topic, String message) {
        this.topic = topic;
        this.message = message;
    }

    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if (e != null) {
            System.out.println(topic + ": " + message + "--消息发送失败");
        }else {
            System.out.println(topic + ": " + message + "--消息发送成功");
        }
    }
}

 

4, kafka consumer

package com.wenbronk.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.Test;

import java.io.FileNotFoundException;
import java.util.*;

public class KafkaConsumerMain {

    /**
     * 自动提交offset
     */
    public void commitAuto(List<String> topics) throws FileNotFoundException {
        Properties props = KafkaUtils.getProperties("cosumer");
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(topics);
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records)
                System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        }
    }

    /**
     * 手动提交offset
     *
     * @throws FileNotFoundException
     */
    public void commitControl(List<String> topics) throws FileNotFoundException {
        Properties props = KafkaUtils.getProperties("cosumer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(topics);
        final int minBatchSize = 2;
        List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record : records) {
                buffer.add(record);
            }
            if (buffer.size() >= minBatchSize) {
                insertIntoDb(buffer);
                // 阻塞同步提交
                consumer.commitSync();
                buffer.clear();
            }
        }
    }

    /**
     * 手动设置分区
     */
    public void setOffSet(List<String> topics) throws FileNotFoundException {
        Properties props = KafkaUtils.getProperties("cosumer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(topics);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
            // 处理每个分区消息后, 提交偏移量
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);

                for (ConsumerRecord<String, String> record : partitionRecords) {
                    System.out.println(record.offset() + ": " + record.value());
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
            }
        }
    }

    /**
     * 手动设置消息offset
     */
    public void setSeek(List<String> topics) throws FileNotFoundException {
        Properties props = KafkaUtils.getProperties("cosumer");
        props.put("enable.auto.commit", "false");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(topics);
        consumer.seek(new TopicPartition("http_zhixin", 0), 797670770);
        ConsumerRecords<String, String> records = consumer.poll(100);

        for (ConsumerRecord<String, String> record : records) {
            System.err.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
            consumer.commitSync();
        }

    }

    @Test
    public void test() throws FileNotFoundException {
        ArrayList<String> topics = new ArrayList<>();
        topics.add("http_zhixin");

//        commitAuto(topics);
//        commitControl(topics);
//        setOffSet(topics);
        setSeek(topics);
    }

    /**
     * doSomethings
     */
    private void insertIntoDb(List<ConsumerRecord<String, String>> buffer) {
        buffer.stream().map(x -> x.value()).forEach(System.err::println);
    }


}

 

kafka 处于同一组的消费者, 不可以重复读取消息, 0.11版本中加入了事物控制

相关文章:

  • mysql学习笔记(四)--- 聚合函数、控制流程函数
  • .NET 同步与异步 之 原子操作和自旋锁(Interlocked、SpinLock)(九)
  • 让mysql查询强制走索引
  • Unity几个有用的游戏运动特效
  • 终端搜索工具
  • ubuntu 15.04
  • STM32 IAP docs
  • Dockerfile构建LNMP分离环境部署wordpress
  • 无人便利店代理的系统用于其他行业是否可以
  • bat遍历目录
  • JGit
  • 1006 等差数列
  • ambari HDFS-HA 回滚
  • V-4-1 vCenter的安装之配置ODBC
  • robotium之does not have a signature matching问题
  • 「译」Node.js Streams 基础
  • dva中组件的懒加载
  • JavaScript 基础知识 - 入门篇(一)
  • Linux链接文件
  • python3 使用 asyncio 代替线程
  • scala基础语法(二)
  • 分布式事物理论与实践
  • 京东美团研发面经
  • 什么软件可以提取视频中的音频制作成手机铃声
  • 收藏好这篇,别再只说“数据劫持”了
  • 数据可视化之 Sankey 桑基图的实现
  • 深度学习之轻量级神经网络在TWS蓝牙音频处理器上的部署
  • MiKTeX could not find the script engine ‘perl.exe‘ which is required to execute ‘latexmk‘.
  • 通过调用文摘列表API获取文摘
  • ​渐进式Web应用PWA的未来
  • #Lua:Lua调用C++生成的DLL库
  • #pragam once 和 #ifndef 预编译头
  • (a /b)*c的值
  • (C语言)编写程序将一个4×4的数组进行顺时针旋转90度后输出。
  • (echarts)echarts使用时重新加载数据之前的数据存留在图上的问题
  • (JSP)EL——优化登录界面,获取对象,获取数据
  • (ZT)北大教授朱青生给学生的一封信:大学,更是一个科学的保证
  • (待修改)PyG安装步骤
  • (二)斐波那契Fabonacci函数
  • (附源码)ssm跨平台教学系统 毕业设计 280843
  • (顺序)容器的好伴侣 --- 容器适配器
  • (推荐)叮当——中文语音对话机器人
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)
  • (转载)在C#用WM_COPYDATA消息来实现两个进程之间传递数据
  • .NET/ASP.NETMVC 深入剖析 Model元数据、HtmlHelper、自定义模板、模板的装饰者模式(二)...
  • .NET/C# 使窗口永不获得焦点
  • .net开发引用程序集提示没有强名称的解决办法
  • .NET设计模式(11):组合模式(Composite Pattern)
  • .NET中的Exception处理(C#)
  • .net最好用的JSON类Newtonsoft.Json获取多级数据SelectToken
  • .pub是什么文件_Rust 模块和文件 - 「译」
  • @EventListener注解使用说明
  • @TableLogic注解说明,以及对增删改查的影响
  • [.NET 即时通信SignalR] 认识SignalR (一)
  • [20140403]查询是否产生日志