当前位置: 首页 > news >正文

SpringBoot集成kafka-自定义拦截器(可以在拦截器中做记录日志、安全检查等操作)

@TOC

在这里插入图片描述

1、kafka配置类

  • kafka配置类添加@Configuration注解,springboot启动后会自动读取该配置类;
  • 由于在application.yml文件中我们找不到kafak拦截器相关的配置项,因此需要自定义拦截器;
  • 消费者相关配置方法中添加自定义拦截器配置,这样就可以在自定义拦截器中处理个性化业务需求;
  • 配置类中需要注入消费者工厂bean和消费者监听器工厂,以替换kafak内置默认的消费者工厂和消费者监听器工厂。
package com.power.config;import com.power.Inteceptor.CustomConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;import java.util.HashMap;
import java.util.Map;/*** kafka配置类*/
@Configuration
public class KafkaConfig {@Value("${spring.kafka.bootstrap-servers}")private String bootstrapServers;@Value("${spring.kafka.consumer.key-deserializer}")private String keyDeserializer;@Value("${spring.kafka.consumer.value-deserializer}")private String valueDeserializer;/*** 消费者相关配置* @return*/public Map<String,Object> consumerConfigs(){Map<String,Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,keyDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);//添加一个消费者拦截器props.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomConsumerInterceptor.class.getName());return props;}/*** 消费者工厂*/@Beanpublic ConsumerFactory<String,String> consumerFactory(){return new DefaultKafkaConsumerFactory<>(consumerConfigs());}@Beanpublic KafkaListenerContainerFactory<?> ourKafkaListenerContainerFactory(ConsumerFactory<String,String> ourConsumerFactory){ConcurrentKafkaListenerContainerFactory<String,String> listenerContainerFactory = new ConcurrentKafkaListenerContainerFactory<>();listenerContainerFactory.setConsumerFactory(ourConsumerFactory);return listenerContainerFactory;}}

2、自定义拦截器类

package com.power.Inteceptor;import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecords;import java.util.Map;/*** 自定义的消费者拦截器*/
public class CustomConsumerInterceptor implements ConsumerInterceptor<String,String> {/*** 在消费消息之前执行* @param record* @return*/@Overridepublic ConsumerRecords onConsume(ConsumerRecords record) {System.out.println("onConsumer方法执行(在消费消息之前执行),record="+record);return record;}/*** 在拿到消息之后,提交offset之前执行该方法* @param offsets*/@Overridepublic void onCommit(Map offsets) {System.out.println("onCommit方法执行(在拿到消息之后,提交offset之前执行该方法),offsets="+offsets);}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

3、消费者

package com.power.consumer;import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;@Component
public class EventConsumer {@KafkaListener(topics = {"interceptorTopic"}, groupId = "interceptorGroup", containerFactory = "ourKafkaListenerContainerFactory")public void onEvent(ConsumerRecord<String, String> record) {System.out.println("消费者消费消息record = " + record);}
}

4、生产者

package com.power.producer;import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.Date;@Component
public class EventProducer {@Resourceprivate KafkaTemplate<String,Object> kafkaTemplate;public void sendEvent(){User user = User.builder().id(1).phone("15676767673").birthday(new Date()).build();String userJson = JSONUtils.toJSON(user);kafkaTemplate.send("interceptorTopic","k", userJson);}}

5、实体类(用于发送接收对象消息)

package com.power.model;import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;import java.util.Date;@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {private Integer id;private String phone;private Date birthday;}

6、JSON工具类

package com.power.util;import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;public class JSONUtils {private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();public static String toJSON(Object object){try {return OBJECTMAPPER.writeValueAsString(object);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}public static <T> T toBean(String json,Class<T> clazz){try {return OBJECTMAPPER.readValue(json,clazz);} catch (JsonProcessingException e) {throw new RuntimeException(e);}}
}

7、启动类

package com.power;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;import java.util.Map;@SpringBootApplication
public class Kafka04Application {public static void main(String[] args) {ConfigurableApplicationContext context = SpringApplication.run(Kafka04Application.class, args);Map<String, ConsumerFactory> beansOfType = context.getBeansOfType(ConsumerFactory.class);beansOfType.forEach((k,v)->{System.out.println(k+" -- "+v);});System.out.println("----------------------------------------------------");Map<String, KafkaListenerContainerFactory> beansOfType2 = context.getBeansOfType(KafkaListenerContainerFactory.class);beansOfType2.forEach((k,v)->{System.out.println(k+" -- "+v);});}}

以下红框内容用于查看SpringBoot启动后注入的类型
在这里插入图片描述
在这里插入图片描述

8、测试类

package com.power;import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;import javax.annotation.Resource;@SpringBootTest
public class SpringBoot04KafkaBaseApplication {@Resourceprivate EventProducer eventProducer;@Testvoid sendInterceptor(){eventProducer.sendEvent();}}

9、测试

  • 先启动消费者监听
  • 在启动生产者发送消息
  • 测试结果发现,消费者走了我们自定义的拦截器

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 汽车线束品牌服务商推荐-力可欣:致力于汽车连接线束和汽车连接器的开发、生产和应用
  • 挂个人-CSDN Java优秀内容博主rundreamsFly抄袭
  • C语言从头学51—多文件项目
  • 培训第三十八天(上传镜像,私有仓库下载镜像,跨主机容器间的通信,harbor软件包下载)
  • RK3568平台(平台总线篇)SPI驱动框架分析
  • C语言家教记录(八)
  • 豆包插件分享
  • C++系列-类模板案例
  • 最大噪音值甚至受法规限制,如何基于LBM算法有效控制风扇气动噪音
  • 数据结构之链表
  • AI辅助编码在主流IDE中的智能代码补全说明
  • 一 初识爬虫
  • 分享7款宝藏APP,用途多样,值得一试
  • centos7.9系统安装cloudpods并使用ceph存储(二)
  • kubernetes培训
  • [ JavaScript ] 数据结构与算法 —— 链表
  • [译]Python中的类属性与实例属性的区别
  • 【前端学习】-粗谈选择器
  • Angular 响应式表单之下拉框
  • CSS 提示工具(Tooltip)
  • iOS小技巧之UIImagePickerController实现头像选择
  • Java超时控制的实现
  • oldjun 检测网站的经验
  • PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websocket 服务...
  • SQLServer插入数据
  • vue和cordova项目整合打包,并实现vue调用android的相机的demo
  • Webpack 4 学习01(基础配置)
  • yii2中session跨域名的问题
  • 关于extract.autodesk.io的一些说明
  • 基于axios的vue插件,让http请求更简单
  • 解析 Webpack中import、require、按需加载的执行过程
  • 开放才能进步!Angular和Wijmo一起走过的日子
  • 前端设计模式
  • 如何打造100亿SDK累计覆盖量的大数据系统
  • No resource identifier found for attribute,RxJava之zip操作符
  • 如何用纯 CSS 创作一个菱形 loader 动画
  • ​云纳万物 · 数皆有言|2021 七牛云战略发布会启幕,邀您赴约
  • (2024,RWKV-5/6,RNN,矩阵值注意力状态,数据依赖线性插值,LoRA,多语言分词器)Eagle 和 Finch
  • (Qt) 默认QtWidget应用包含什么?
  • (二开)Flink 修改源码拓展 SQL 语法
  • (附源码)spring boot球鞋文化交流论坛 毕业设计 141436
  • (佳作)两轮平衡小车(原理图、PCB、程序源码、BOM等)
  • (三)Hyperledger Fabric 1.1安装部署-chaincode测试
  • (十)Flink Table API 和 SQL 基本概念
  • (十六)视图变换 正交投影 透视投影
  • (转)Android中使用ormlite实现持久化(一)--HelloOrmLite
  • (转)Linux整合apache和tomcat构建Web服务器
  • (转)Sql Server 保留几位小数的两种做法
  • (转)我也是一只IT小小鸟
  • **PyTorch月学习计划 - 第一周;第6-7天: 自动梯度(Autograd)**
  • .bat批处理(十):从路径字符串中截取盘符、文件名、后缀名等信息
  • .gitignore文件设置了忽略但不生效
  • .NET Core 版本不支持的问题
  • .NET MAUI Sqlite数据库操作(二)异步初始化方法
  • .NET 解决重复提交问题