当前位置: 首页 > news >正文

一个简单的spring+kafka生产者

1. pom

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency>

2. 生产者

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.xxx.npi.module.common.msg.dto.MsgBase;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;import java.util.ArrayList;
import java.util.List;@Service
public class MyMessageProducerService {@Value("${npi.default-url}")private String domain;private final KafkaTemplate<String, String> kafkaTemplate;public MyMessageProducerService(KafkaTemplate<String, String> kafkaTemplate) {this.kafkaTemplate = kafkaTemplate;}public <T extends MsgBase> void sendMessage(String topicName, T msgObj) {List<T> list = new ArrayList<>();list.add(msgObj);if("https://npi.xxx.com".equals(domain)){kafkaTemplate.send(topicName, toJsonString(list));}}public <T extends MsgBase> void sendMessage(String topicName, List<T> list) {if("https://npi.xxx.com".equals(domain)){kafkaTemplate.send(topicName, toJsonString(list));}}private String toJsonString(Object obj) {return JSON.toJSONString(obj,SerializerFeature.WriteDateUseDateFormat,SerializerFeature.WriteMapNullValue,SerializerFeature.WriteNullListAsEmpty,SerializerFeature.WriteNullStringAsEmpty,SerializerFeature.DisableCircularReferenceDetect);}}

3. 配置

import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.Resource;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.HashMap;
import java.util.Map;@Configuration
@EnableKafka
public class KafkaProducerConfig {@Value("${spring.kafka.producer.bootstrap-servers}")private String servers;@Value("${spring.kafka.producer.retries}")private int retries;@Value("${spring.kafka.producer.acks}")private String acks;@Value("${spring.kafka.producer.batch-size}")private int batchSize;@Value("${spring.kafka.producer.linger-ms}")private int lingerMs;@Value("${spring.kafka.producer.buffer-memory}")private int bufferMemory;@Value("${spring.kafka.producer.key-serializer}")private String keySerializer;@Value("${spring.kafka.producer.value-serializer}")private String valueSerializer;@Value("${spring.kafka.producer.security.protocol}")private String securityProtocol;@Value("${spring.kafka.producer.ssl.truststore.location}")private Resource sslTruststoreLocationResource;@Value("${spring.kafka.producer.ssl.truststore.password}")private String sslTruststorePassword;@Value("${spring.kafka.producer.sasl.mechanism}")private String saslMechanism;@Value("${spring.kafka.producer.sasl.jaas.config}")private String saslJaasConfig;@SuppressWarnings({"unchecked", "rawtypes"})@Beanpublic KafkaTemplate<String, String> kafkaTemplate() {return new KafkaTemplate(producerFactory());}@SuppressWarnings("unchecked")@Beanpublic ProducerFactory<String, String> producerFactory() {@SuppressWarnings("rawtypes")DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(producerConfigs());// factory.transactionCapable();// factory.setTransactionIdPrefix("transaction-");return factory;}public Map<String, Object> producerConfigs() {Map<String, Object> props = new HashMap<>();props.put("bootstrap.servers", servers);props.put("acks", acks);props.put("retries", retries);props.put("batch.size", batchSize);props.put("linger.ms", lingerMs);props.put("buffer.memory", bufferMemory);props.put("key.serializer", keySerializer);props.put("value.serializer", valueSerializer);props.put("security.protocol", securityProtocol);props.put("sasl.mechanism", saslMechanism);props.put("sasl.jaas.config", saslJaasConfig);// 如果需要更低级别的消息丢失防护,可以启用幂等性props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);// SSL配置props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, "JKS");try {// 将类路径资源转换为临时文件路径InputStream inputStream = sslTruststoreLocationResource.getInputStream();File tempFile = File.createTempFile("client_truststore", ".jks");Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, tempFile.getAbsolutePath());props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, sslTruststorePassword);} catch (IOException e) {throw new RuntimeException("Failed to locate truststore file", e);}return props;}
}

4. application

spring:kafka:producer:bootstrap-servers: n2.ikt.xxx.com:9092, n3.ikt.xxx.com:9092, n4.ikt.xxx.com:9092, n5.ikt.xxx.com:9092, n6.ikt.xxx.com:9092acks: allretries: 3batch-size: 16384linger-ms: 1buffer-memory: 33554432key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializersecurity.protocol: SASL_SSLssl.truststore.location: classpath:client_truststore.jksssl.truststore.password: pwdsasl.mechanism: SCRAM-SHA-512sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf-username' password='pwd';topic:br: mdscinpi.mdscinpi-data.tstmem: mdscinpi.msdcinpi-data.tstfbr: mdscinpi.inpi-data.tstcr: mdscinpi.npi-data.tst

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 7.6数据结构作业
  • 下拉菜单显示年份选项(月份也适用)
  • 深入理解C#中的文件系统I/O操作
  • C++报错无法访问Private
  • 卷积神经网络(CNN)和循环神经网络(RNN) 的区别与联系
  • 适用于 Windows的 5 个最佳 PDF 转 Word 转换器
  • Python爬取豆瓣电影+数据可视化,爬虫教程!
  • Linux: network: openvswitch: disk 访问速度导致不稳定
  • 【CentOS7.6】yum 报错:Could not retrieve mirrorlist http://mirrorlist.centos.org
  • 基于TCP的在线词典系统(分阶段实现)
  • vs+qt5.0 使用poppler-qt5 操作库获取pdf所有文本输出到txt操作
  • B站大课堂-自动化精品视频(个人存档)
  • 【踩坑】探究PyTorch中创建稀疏矩阵的内存占用过大的问题
  • 只需4500字,带你学习Python中7种基础数据类型!
  • 基于单片机的多功能电子时钟的设计
  • [原]深入对比数据科学工具箱:Python和R 非结构化数据的结构化
  • Brief introduction of how to 'Call, Apply and Bind'
  • Centos6.8 使用rpm安装mysql5.7
  • Java 网络编程(2):UDP 的使用
  • js递归,无限分级树形折叠菜单
  • Redis学习笔记 - pipline(流水线、管道)
  • spark本地环境的搭建到运行第一个spark程序
  • SpiderData 2019年2月16日 DApp数据排行榜
  • TypeScript实现数据结构(一)栈,队列,链表
  • 从@property说起(二)当我们写下@property (nonatomic, weak) id obj时,我们究竟写了什么...
  • 基于Javascript, Springboot的管理系统报表查询页面代码设计
  • 记一次删除Git记录中的大文件的过程
  • 名企6年Java程序员的工作总结,写给在迷茫中的你!
  • 浅谈web中前端模板引擎的使用
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 进程与线程(三)——进程/线程间通信
  • ​Java基础复习笔记 第16章:网络编程
  • ​决定德拉瓦州地区版图的关键历史事件
  • ‌U盘闪一下就没了?‌如何有效恢复数据
  • # 手柄编程_北通阿修罗3动手评:一款兼具功能、操控性的电竞手柄
  • # 数论-逆元
  • #Linux(make工具和makefile文件以及makefile语法)
  • ()、[]、{}、(())、[[]]等各种括号的使用
  • (1)Nginx简介和安装教程
  • (13)Latex:基于ΤΕΧ的自动排版系统——写论文必备
  • (ISPRS,2023)深度语义-视觉对齐用于zero-shot遥感图像场景分类
  • (二)c52学习之旅-简单了解单片机
  • (回溯) LeetCode 131. 分割回文串
  • (排序详解之 堆排序)
  • (十二)devops持续集成开发——jenkins的全局工具配置之sonar qube环境安装及配置
  • (十七)Flink 容错机制
  • (十三)Java springcloud B2B2C o2o多用户商城 springcloud架构 - SSO单点登录之OAuth2.0 根据token获取用户信息(4)...
  • (限时免费)震惊!流落人间的haproxy宝典被找到了!一切玄妙尽在此处!
  • (已解决)Bootstrap精美弹出框模态框modal,实现js向modal传递数据
  • (原)记一次CentOS7 磁盘空间大小异常的解决过程
  • .Net core 6.0 升8.0
  • .NET Core 中插件式开发实现
  • .NET 中的轻量级线程安全
  • .net获取当前url各种属性(文件名、参数、域名 等)的方法
  • .Net通用分页类(存储过程分页版,可以选择页码的显示样式,且有中英选择)