kafka 代码使用
目录
简单使用
Producer消息发送方
发送消息到指定分区上
未指定分区
同步发送
异步发消息
ACK参数设置
消息发送缓冲区
编辑Consumer消息消费者
简单实现
提交offset
长轮询poll消息
心跳 -- 健康检查
指定分区和偏移量、时间消费
新消费组消费offset规则
整合springboot
依赖
配置文件
Producer生产者
Consumer消费者
设置消费组、多个topic、指定分区、指定偏移量消费及消费个数
简单使用
依赖
最好是根据kafka的版本,引入对应的依赖版本
其中kafka_2.13-2.8.1这个目录就是自带了版本号。
前面为scala版本,后面为kafka版本。
样例说明:2.13为scala版本,2.8.1为kafka版本。
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
Producer消息发送方
public class MySimpleProducer {
private final static String TOPIC_NAME = "my-replicated-topic";
public static void main(String[] args) throws Exception {
//1.设置参数
Properties props = new Properties();
//设置kafka集群
// props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
// "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
//把发送的key从字符串序列化为字节数组
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//把发送消息value从字符串序列化为字节数组
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//2.创建⽣产消息的客户端,传⼊参数
Producer<String, String> producer = new KafkaProducer<>(props);
//3.创建消息
//key:作⽤是决定了往哪个分区上发,value:具体要发送的消息内容
ProducerRecord<String, String> producerRecord =
new ProducerRecord<>(TOPIC_NAME, "mykeyvalue", "hellokafka");
//4.发送消息,得到消息发送的元数据并输出
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:" + "topic-" +
metadata.topic()
+ "|partition-" + metadata.partition()
+ "|offset-" + metadata.offset());
}
}
ProducerRecord可以不带key的构造方法,但是推荐使用代用key的,如果存在分区,kafka会根据key的经过特定的hash算法,基础这个消息应该落到哪个分区。
结果如下: partition表示分区,offset表示偏移量. 所以这个消息发送到了0 分区,偏移量为1
发送消息到指定分区上
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(TOPIC_NAME , 0, order.getOrderId().toString(), JSON.toJSONString(order));
第二个参数执行分区,有特殊要求时才会用,一般不使用
未指定分区
则会通过业务key的hash运算,算出消息往哪个分区上发
//未指定发送分区,具体发送的分区计算公式:hash(key)%partitionNum
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(TOPIC_NAME, order.getOrderId().toString(), JSON.toJSONString(order));
同步发送
//等待消息发送成功的同步阻塞⽅法
RecordMetadata metadata = producer.send(producerRecord).get();
System.out.println("同步⽅式发送消息结果:"
+ "topic-" + metadata.topic()
+ "|partition-" + metadata.partition()
+ "|offset-" + metadata.offset());
异步发消息
//要发送5条消息
Order order = new Order((long) i, i);
//指定发送分区
ProducerRecord<String, String> producerRecord = new
ProducerRecord<String, String>(TOPIC_NAME,
order.getOrderId().toString(), JSON.toJSONString(order));
//异步回调方式发送消息
producer.send(producerRecord, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null) {
System.err.println("发送消息失败:" + exception.getStackTrace());
}
if (metadata != null) {
System.out.println("异步方式发送消息结果:"
+ "topic-" + metadata.topic()
+ "|partition-" + metadata.partition()
+ "|offset-" + metadata.offset());
}
}
});
ACK参数设置
- ack = 0 :消息到达kafka-cluster,kafka-cluster不需要任何的broker收到消息,就立即返回ack给生产者,最容易丢消息的,效率是最高的
- ack = 1(默认):多副本之间的leader已经收到消息,并把消息写入到本地log中,才返回ack给生产者,性能和安全性是最均衡的
- ack = -1/all. 搭配kafka的min.insync.replicas配置,如果min.insync.replicas=2(默认为1,推荐配置大于等于2),此时就需要leader和一个follower同步完后,才会返回ack给生产者(此时集群中有2个broker已完成数据的接收),这种方式最安全,但性能最差。
关于ack以及重试(没有收到ack,会重新发送消息)的配置:
//设置ack的值 = 1
props.put(ProducerConfig.ACKS_CONFIG, "1");
/*
发送失败会重试,默认重试间隔100ms,重试能保证消息发送的可靠性,但是也可能造
成消息重复发送,比如网络抖动,所以需要在接收者那边做好消息接收的幂等性处理
*/
//设置重试次数
props.put(ProducerConfig.RETRIES_CONFIG, 3);
//重试间隔设置
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 300);
消息发送缓冲区
- kafka默认会创建一个消息缓冲区,用来存放要发送的消息,缓冲区是32m
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
- kafka本地线程会去缓冲区中一次拉16k的数据,发送到broker
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
- 如果线程拉不到16k的数据,间隔10ms也会将已拉倒的数据发到broker
props.put(ProducerConfig.LINGER_MS_CONFIG, 10);
Consumer消息消费者
简单实现
package com.lb;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class MySimplerConsumer {
private final static String TOPIC_NAME = "my-replicated-topic";
private final static String CONSUMER_GROUP_NAME = "testGroup";
public static void main(String[] args) {
Properties props = new Properties();
//设置kafka集群
// props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
// "172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.99.100:9092");
//设置分组名称
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP_NAME);
//设置key和value的反序列化
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
//1.创建一个消费者客户端
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
//2.订阅一个主题列表
consumer.subscribe(Arrays.asList(TOPIC_NAME));
while (true) {
/*
* 3.poll()是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
//4.打印消息
System.out.printf("收到消息:partition = %d,offset = %d, key = %s, value = %s%n", record.partition(),
record.offset(), record.key(), record.value());
}
}
}
}
提交offset
1)提交内容
消费者无论是自动提交还是手动提交,都需要把
所属的消费组+消费的主题+消费分区+消费的偏移量
这些信息提交给集群的_consumer_offsets主题里面
2) 自动提交
消费者poll消息下来以后就会自动提交offset
// 是否主动提交offset,默认就是true
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
// 主动提交offset的间隔时间
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
注意:自动提交会丢失消息。因为消费者会在消费前提前offset,有可能提交完后还没有消费时就挂了
3) 手动提交
需要把手动提交的配置设置成false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
手动提交又分成了两种:
- 手动同步提交
在消费完消息后调用同步提交的方法,当集群返回ack前一直阻塞,返回ack后表示提交offset成功,执行之后的逻辑
while (true) {
/*
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key= %s, value =%s%n",
record.partition(),record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {//有消息
// 手动同步提交offset,当前线程会阻塞直到offset提交成功
// 一般使用同步提交
consumer.commitSync();//=======阻塞=== 提交成功
}
}
}
一般使用同步提交,记得给consumer.commitSync();进行异常捕获,添加异常处理
- 手动异步提交
在消息消费完后提交,不需要等到集群的ack,直接执行后续逻辑,可以设置一个回调方法,够集群调用
while (true) {
/*
* poll() API 是拉取消息的⻓轮询
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key= % s, value = %s % n ",
record.partition(), record.offset(), record.key(), record.value());
}
//所有的消息已消费完
if (records.count() > 0) {
// ⼿动异步提交offset,当前线程提交offset不会阻塞,可以继续处理后⾯的程序逻辑
consumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
if (exception != null) {
System.err.println("Commit failed for " + offsets);
System.err.println("Commit failed exception: " +
exception.getStackTrace());
}
}
});
}
}
长轮询poll消息
- 默认情况下,消费者一次会poll500条消息
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
- 代码中设置了长轮询的时间是1000毫秒
while (true) {
/*
* poll() API 是拉取消息的长轮询
*/
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("收到消息:partition = %d,offset = %d, key = %s,value = %s%n",
record.partition(),record.offset(), record.key(), record.value());
}
poll数量 & 长轮询 的意思是:
- 如果一次poll到500条消息,就直接返回执行for循环的逻辑
- 如果一次poll没有拿到500条消息。且时间仍在1秒内,那么长轮询继续poll,直到消息数量达到500条,就返回执行for循环的逻辑
- 如果多次poll都没有达到500条,且1秒时间已经到了,就返回执行for循环的逻辑
- 如果两个poll的时间间隔超过30s,集群会认为该消费者的消费能力过弱,将该消费者提出消费组,触发rebalance机制,rebalance机制会造成性能开销,则可以通过减少poll条数的设置,来满足时间限制
//一次poll最大拉取消息的条数,可以根据消费速度的快慢来设置props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 500);
//如果两次poll的时间如果超出了30s的时间间隔,kafka会认为其消费能⼒过弱,将其踢出消费组。将分区分配给其他消费者。-rebalance
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30 * 1000);
心跳 -- 健康检查
消费者每隔1s向kafka集群发送心跳,集群发现如果超过10s没有收到消费者的心跳,将被提出消费组,触发该分区消费组的rebalance机制,将该分区交给消费组里的其他消费者进行消费
//consumer给broker发送⼼跳的间隔时间
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 1000);
//kafka如果超过10秒没有收到消费者的心跳,则会把消费者踢出消费组,进行rebalance,把分区分配给其他消费者。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10 * 1000);
指定分区和偏移量、时间消费
- 指定分区消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 从头消费 -- offset = 0 开始消费
consumer.assign(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
consumer.seekToBeginning(Arrays.asList(new TopicPartition(TOPIC_NAME, 0)));
- 指定时间消费
根据时间,去所有的partition中确定该时间的对应的offset,然后去所有partition中找到该offset之后开始消费消息
//获取主题的所有分区
List<PartitionInfo> topicPartitions = consumer.partitionsFor(TOPIC_NAME);
//从1小时前开始消费
long fetchDataTime = new Date().getTime() - 1000 * 60 * 60;
Map<TopicPartition, Long> map = new HashMap<>();
for (PartitionInfo par : topicPartitions) {
map.put(new TopicPartition(TOPIC_NAME, par.partition()), fetchDataTime);
}
//根绝主题以及所有分区,以及提供的时间点 -- 获取到分区对应时间点的 offset偏移量
Map<TopicPartition, OffsetAndTimestamp> parMap = consumer.offsetsForTimes(map);
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : parMap.entrySet()) {
//获得分区
TopicPartition key = entry.getKey();
//获得时间对应分区偏移量的对象
OffsetAndTimestamp value = entry.getValue();
if (key == null || value == null) continue;
Long offset = value.offset();
System.out.println("partition-" + key.partition() +
"|offset-" + offset);
//根据消费⾥的timestamp确定offset
if (value != null) {
//设置分区
consumer.assign(Arrays.asList(key));
//根绝分区以及偏移量 获取之后的所有消息
consumer.seek(key, offset);
}
}
新消费组消费offset规则
新消费组中的消费者在启动以后,默认会从当前分区最后一条消息的offset+1开始消费(消费新的消息)。可以通过以下的设置,让新的消费者第一次从头开始消费。之后开始消费新的消息(最后消费的位置的偏移量 + 1)
- Latest: 默认的,消费新消息
- earliest:第一次从头开始消费。之后开始消费新消息(最后消费的位置的偏移量 + 1)
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
整合springboot
依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
配置文件
spring:
kafka:
#设置kafka集群
#bootstrap-servers: 172.16.253.38:9092,172.16.253.38:9093,172.16.253.38:9094
bootstrap-servers: 192.168.99.100:9092
#设置生成者
producer:
# 1:leader接收到消息落盘后就返回ack
acks: 1
#设置重试次数 大于0的值
retries: 3
#去缓冲区中一次拉16k的数据,发送到broker
batch-size: 16384
#缓冲区大小32M
buffer-memory: 33554432
#指定消息key和value的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
#设置消费者
consumer:
#默认消费组
group-id: testGroup
#关闭自动提交
enable-auto-commit: false
#新消费组从头开始消费
auto-offset-reset: earliest
#设置消息key和value的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#一次poll最大拿500条消息
max-poll-records: 500
properties:
# 消费会话超时时间(超过这个时间consumer没有发送心跳,就会触发rebalance操作)
session:
timeout:
ms: 120000
# 消费消息超时时间
request:
timeout:
ms=180000:
listener:
#RECORD 当每⼀条记录被消费者监听器(ListenerConsumer)处理之后提交
#BATCH 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
#TIME 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间⼤于TIME时提交
#COUNT 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量⼤于等于COUNT时提交
#COUNT_TIME TIME | COUNT 有⼀个条件满⾜时提交
#MANUAL 当每⼀批poll()的数据被消费者监听器(ListenerConsumer)处理之后, ⼿动调⽤Acknowledgment.acknowledge()后提交
#MANUAL_IMMEDIATE ⼿动调⽤Acknowledgment.acknowledge()后⽴即提交,⼀般使⽤这种
ack-mode: manual_immediate
Producer生产者
private final static String TOPIC_NAME = "my-replicated-topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
/**
* send方法默认是异步的
*
* @return
*/
@RequestMapping("/send")
public String send() {
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message");
return "send success";
}
/**
* 同步发送 future.get()方法阻塞等到结果返回
*/
@RequestMapping("/send3")
public String send_syn() {
try {
SendResult<String, String> sendResult =
kafkaTemplate.send(TOPIC_NAME, 0, "key", "this is a message").get();
ProducerRecord<String, String> record = sendResult.getProducerRecord();
RecordMetadata metadata = sendResult.getRecordMetadata();
System.out.println("消息发送成功");
System.out.printf("消息key = %s ,value = %s%n", record.key(), record.value());
System.out.printf("主题 = %s, partition = %d, offset = %d%n", metadata.topic(), metadata.partition(), metadata.offset());
return "send success";
} catch (Exception e) {
//发生异常时
e.printStackTrace();
return "send fail";
}
}
异步发送消息添加回调监听
package com.lb.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.support.ProducerListener;
import org.springframework.stereotype.Component;
@Component
public class ProduceListener implements ProducerListener {
private static final Logger log = LoggerFactory.getLogger(ProduceListener.class);
@Override
public void onSuccess(ProducerRecord producerRecord, RecordMetadata recordMetadata) {
log.info("Message send success : " + producerRecord.toString());
log.info("Message send success topic:{} , partition:{} , offset: {}",
recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
}
@Override
public void onError(ProducerRecord producerRecord, Exception exception) {
log.info("Message send success : " + producerRecord.toString());
}
}
Consumer消费者
package com.lb.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
@Component
public class MyConsumer {
/**
* groupId = MyGroup1 消费 my-replicated-topic 主题的消息
* 这里的groupId是自己设定消费组,如果不设定则使用yml中默认的消费组
* @param record 说明此方法是消费一条消息的逻辑
* @param ack
*/
@KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")
public void listen_MyReplicatedTopic_MyGroup1_01(
ConsumerRecord<String,String> record, Acknowledgment ack){
String value = record.value();
System.out.printf(
"groupId = MyGroup1 接受到 %s 的一条消息,内容是:" +
"partition = %d,offset = %d, key = %s, value = %s%n",
record.topic(),record.partition(),record.offset(), record.key(), record.value());
//手动提交offset
ack.acknowledge();
}
/**
* @param records 消费多条消息的逻辑,只是方法参数不一样,2种都支持
* @param ack
*/
/* @KafkaListener(topics = "my-replicated-topic", groupId = "MyGroup1")
public void listen_MyReplicatedTopic_MyGroup1_01(
ConsumerRecords<String,String> records, Acknowledgment ack){
for (ConsumerRecord<String, String> record : records) {
String value = record.value();
System.out.printf(
"groupId = MyGroup1 接受到 %s 的一条消息,内容是:" +
"partition = %d,offset = %d, key = %s, value = %s%n",
record.topic(),record.partition(),record.offset(), record.key(), record.value());
}
//手动提交offset
ack.acknowledge();
}*/
}
2个Consumer的区别在于
1.一个是消费一条消息的逻辑,一个是消费一组消息的逻辑
2.ack.acknowledge();如果配合配置文件中的spring.kafka.listener.ack-mode.如果配置值为manual_immediate则表示第一个方法消费一条信息就给kafka发送这条消息的确认接受的ack,第二个方法时消费完一组消息,一起提交ack。而如果配置值为MANUAL,则表示即使在代码中是每条消息都发送的了确认ack,依然是每一批poll都消费完了,一起确认。
设置消费组、多个topic、指定分区、指定偏移量消费及消费个数
@KafkaListener(groupId = "testGroup",
topicPartitions = {
@TopicPartition(topic = "topic1", partitions = {"0", "1"}),
@TopicPartition(topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",
initialOffset = "100"))
},
concurrency = "3")//concurrency就是同组下的消费者个数,就是并发消费数, 建议⼩于等于分区总数
public void listenGroup(ConsumerRecord<String, String> record,
Acknowledgment ack) {
String value = record.value();
System.out.println(value);
System.out.println(record);
//⼿动提交offset
ack.acknowledge();
}
注解含义:
groupId: 消费组名称
topicPartitions :主题和分区信息集合
TopicPartition:一个topic以及分区信息
topic = "topic1", partitions = {"0", "1"} : 消费主题为topic1,分区0和1的消息
topic = "topic2", partitions = "0",
partitionOffsets = @PartitionOffset(partition = "1",initialOffset = "100")
:消费主题topic2,分区0的所有消息,分区1,offset=100以后的消息
concurrency = "3" :表明创建同一个消费组下消费者个个数,不要大于分区数,否则会存在空闲的消费者。