当前位置: 首页 > news >正文

kafka-生产者监听器(SpringBoot整合Kafka)

文章目录

  • 1、生产者监听器
    • 1.1、创建生产者监听器
    • 1.2、创建生产者拦截器
    • 1.3、发送消息测试
    • 1.4、使用Java代码创建主题分区副本
    • 1.5、application.yml配置----v1版
    • 1.6、屏蔽 kafka debug 日志 logback.xml
    • 1.7、引入spring-kafka依赖
    • 1.8、控制台日志

1、生产者监听器

1.1、创建生产者监听器

package com.atguigu.kafka.listener;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.kafka.support.ProducerListener;
@Component
public class MyKafkaProducerListener implements ProducerListener<String,String> {//生产者 ack 配置为 0 只要发送即成功//ack为 1  leader落盘  broker ack之后 才成功//ack为 -1 分区所有副本全部落盘  broker ack之后 才成功@Overridepublic void onSuccess(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata) {//ProducerListener.super.onSuccess(producerRecord, recordMetadata);System.out.println("MyKafkaProducerListener消息发送成功:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());}//消息发送失败的回调:监听器可以接收到发送失败的消息 可以记录失败的消息@Overridepublic void onError(ProducerRecord<String, String> producerRecord, RecordMetadata recordMetadata, Exception exception) {System.out.println("MyKafkaProducerListener消息发送失败:"+"topic="+producerRecord.topic()+",partition = "+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value()+",offset = "+recordMetadata.offset());System.out.println("异常信息:" + exception.getMessage());}
}

1.2、创建生产者拦截器

package com.atguigu.kafka.interceptor;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.stereotype.Component;
import java.util.Map;
//拦截器必须手动注册给kafka生产者(KafkaTemplate)
@Component
public class MyKafkaInterceptor implements ProducerInterceptor<String,String> {//kafka生产者发送消息前执行:拦截发送的消息预处理@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {System.out.println("生产者即将发送消息:topic = "+ producerRecord.topic()+",partition:"+producerRecord.partition()+",key = "+producerRecord.key()+",value = "+producerRecord.value());return null;}//kafka broker 给出应答后执行@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {//exception为空表示消息发送成功if(e == null){System.out.println("消息发送成功:topic = "+ recordMetadata.topic()+",partition:"+recordMetadata.partition()+",offset="+recordMetadata.offset()+",timestamp="+recordMetadata.timestamp());}}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> map) {}
}

1.3、发送消息测试

package com.atguigu.kafka.producer;import com.atguigu.kafka.interceptor.MyKafkaInterceptor;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import java.io.IOException;@SpringBootTest
class KafkaProducerApplicationTests {//装配kafka模板类: springboot启动时会自动根据配置文初始化kafka模板类对象注入到容器中@ResourceKafkaTemplate kafkaTemplate;@ResourceMyKafkaInterceptor myKafkaInterceptor;@PostConstructpublic void init() {kafkaTemplate.setProducerInterceptor(myKafkaInterceptor);}@Testvoid contextLoads() throws IOException {kafkaTemplate.send("my_topic1", "spring-kafka-生产者监听器");//回调是等kafka,ack以后才执行,需要阻塞System.in.read();}
}

1.4、使用Java代码创建主题分区副本

package com.atguigu.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.stereotype.Component;
@Component
public class KafkaTopicConfig {@Beanpublic NewTopic myTopic1() {//相同名称的主题 只会创建一次,后面创建的主题名称相同配置不同可以做增量更新(分区、副本数)return TopicBuilder.name("my_topic1")//主题名称.partitions(3)//主题分区.replicas(3)//主题分区副本数.build();//创建}
}

1.5、application.yml配置----v1版

server:port: 8110# v1
spring:kafka:bootstrap-servers: 192.168.74.148:9095,192.168.74.148:9096,192.168.74.148:9097producer: # producer 生产者retries: 0 # 重试次数 0表示不重试acks: -1 # 应答级别:多少个分区副本备份完成时向生产者发送ack确认(可选01-1/all)batch-size: 16384 # 批次大小 单位bytebuffer-memory: 33554432 # 生产者缓冲区大小 单位bytekey-serializer: org.apache.kafka.common.serialization.StringSerializer # key的序列化器value-serializer: org.apache.kafka.common.serialization.StringSerializer # value的序列化器

1.6、屏蔽 kafka debug 日志 logback.xml

<configuration>      <!-- 如果觉得idea控制台日志太多,src\main\resources目录下新建logback.xml
屏蔽kafka debug --><logger name="org.apache.kafka.clients" level="debug" />
</configuration>

1.7、引入spring-kafka依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>3.0.5</version><relativePath/> <!-- lookup parent from repository --></parent><!-- Generated by https://start.springboot.io --><!-- 优质的 spring/boot/data/security/cloud 框架中文文档尽在 => https://springdoc.cn --><groupId>com.atguigu.kafka</groupId><artifactId>kafka-producer</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka-producer</name><description>kafka-producer</description><properties><java.version>17</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

1.8、控制台日志

生产者即将发送消息:topic = my_topic1,partition:null,key = null,value = spring-kafka-生产者监听器
消息发送成功:topic = my_topic1,partition:0,offset=0,timestamp=1717573749549
MyKafkaProducerListener消息发送成功:topic=my_topic1,partition = null,key = null,value = spring-kafka-生产者监听器,offset = 0

在这里插入图片描述

在这里插入图片描述

[[{"partition": 0,"offset": 0,"msg": "spring-kafka-生产者监听器","timespan": 1717573749549,"date": "2024-06-05 07:49:09"}]
]

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • The First项目报告:Stargate Finance重塑跨链金融的未来
  • Vxe UI vxe-form 实现折叠表单,当表单很多时实现自动收起与展开
  • 独具魅力的 App UI 风格才能称之为优秀
  • 66. UE5 RPG 实现远程攻击武器配合角色攻击动画
  • 1. 面向对象的由来
  • MyBatis面试题系列三
  • 何为屎山代码?
  • Facebook海外户|如何制作出引人注目的Facebook广告素材?
  • lua vm 五: upvalue
  • django ORM model update常规用法
  • java面试题:java三大特性多态又是如何实现的
  • 作文笔记11 推荐一本书
  • 磁力狗ciligou,磁力链接使用步骤
  • Java--Math类和Random类
  • 【设计模式】面向对象与UML
  • EOS是什么
  • IE报vuex requires a Promise polyfill in this browser问题解决
  • java 多线程基础, 我觉得还是有必要看看的
  • Java-详解HashMap
  • js写一个简单的选项卡
  • Linux中的硬链接与软链接
  • PHP的类修饰符与访问修饰符
  • Python 使用 Tornado 框架实现 WebHook 自动部署 Git 项目
  • Python学习笔记 字符串拼接
  • sessionStorage和localStorage
  • WebSocket使用
  • 一文看透浏览器架构
  • ​​​​​​​ubuntu16.04 fastreid训练过程
  • ​字​节​一​面​
  • # Panda3d 碰撞检测系统介绍
  • (2)(2.10) LTM telemetry
  • (2)STM32单片机上位机
  • (arch)linux 转换文件编码格式
  • (LeetCode 49)Anagrams
  • (zt)基于Facebook和Flash平台的应用架构解析
  • (二)正点原子I.MX6ULL u-boot移植
  • (附源码)springboot猪场管理系统 毕业设计 160901
  • (几何:六边形面积)编写程序,提示用户输入六边形的边长,然后显示它的面积。
  • (经验分享)作为一名普通本科计算机专业学生,我大学四年到底走了多少弯路
  • (九)c52学习之旅-定时器
  • ***原理与防范
  • .dwp和.webpart的区别
  • .NET MAUI Sqlite程序应用-数据库配置(一)
  • .NET/C# 检测电脑上安装的 .NET Framework 的版本
  • .NET/C# 使用反射注册事件
  • .Net6支持的操作系统版本(.net8已来,你还在用.netframework4.5吗)
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .net实现头像缩放截取功能 -----转载自accp教程网
  • @SuppressLint(NewApi)和@TargetApi()的区别
  • [ C++ ] STL_list 使用及其模拟实现
  • [12] 使用 CUDA 进行图像处理
  • [2009][note]构成理想导体超材料的有源THz欺骗表面等离子激元开关——
  • [Android] Amazon 的 android 音视频开发文档
  • [Android]常见的数据传递方式
  • [BT]小迪安全2023学习笔记(第29天:Web攻防-SQL注入)