当前位置: 首页 > news >正文

大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用

点一下关注吧!!!非常感谢!!持续更新!!!

目前已经更新到了:

  • Hadoop(已更完)
  • HDFS(已更完)
  • MapReduce(已更完)
  • Hive(已更完)
  • Flume(已更完)
  • Sqoop(已更完)
  • Zookeeper(已更完)
  • HBase(已更完)
  • Redis (已更完)
  • Kafka (正在更新…)

章节内容

上节我们完成了:

  • topics.sh、producer.sh、consumer.sh 脚本的基本使用
  • pom.xml 配置
  • JavaAPI的使用:producer 和 consumer

在这里插入图片描述

架构图

上节已经出现过了,这里再放一次
在这里插入图片描述

POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>springboot-kafka</artifactId><version>1.0-SNAPSHOT</version><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.2.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>

配置文件

我们常见的配置文件如下图:

spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: my-groupauto-offset-reset: earliestkey-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproducer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializertemplate:default-topic: my-topic

Producer

编写代码

编写了一个KafkaProducerController
里边写了两个方法,都是使用了 KafkaTemplate 的工具。

@RestController
public class KafkaProducerController {@Resourceprivate KafkaTemplate<Integer, String> kafkaTemplate;@RequestMapping("/sendSync/{message}")public String sendSync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 1, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);try {SendResult<Integer, String> result = future.get();System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());} catch (Exception e) {e.printStackTrace();}return "Success";}@RequestMapping("/sendAsync/{message}")public String sendAsync(@PathVariable String message) {ProducerRecord<Integer, String> record = new ProducerRecord<>("wzk_topic_test", 0, 2, message);ListenableFuture<SendResult<Integer, String>> future = kafkaTemplate.send(record);future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {@Overridepublic void onFailure(Throwable ex) {System.out.println("发送失败!");ex.printStackTrace();}@Overridepublic void onSuccess(SendResult<Integer, String> result) {System.out.println("发送成功");System.out.println(result.getProducerRecord().key() + "->" +result.getProducerRecord().partition() + "->" +result.getProducerRecord().timestamp());}});return "Success";}}

测试结果

http://localhost:8085/sendSync/wzktest1
http://localhost:8085/sendAsync/wzktest2
http://localhost:8085/sendAsync/wzktest222222

我们观察控制台的效果如下:
在这里插入图片描述

Consumer

编写代码

编一个类来实现Consumer:

@Configuration
public class KafkaConsumer {@KafkaListener(topics = {"wzk_topic_test"})public void consume(ConsumerRecord<Integer, String> consumerRecord) {System.out.println(consumerRecord.topic() + "\t"+ consumerRecord.partition() + "\t"+ consumerRecord.offset() + "\t"+ consumerRecord.key() + "\t"+ consumerRecord.value());}}

测试运行

2024-07-12 13:48:46.831  INFO 15352 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=wzk-test] Setting offset for partition wzk_topic_test-0 to the committed offset FetchPosition{offset=13, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=h121.wzk.icu:9092 (id: 0 rack: null), epoch=0}}
2024-07-12 13:48:46.926  INFO 15352 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : wzk-test: partitions assigned: [wzk_topic_test-0]
wzk_topic_test	0	13	1	wzktest
wzk_topic_test	0	14	2	wzktest222
wzk_topic_test	0	15	2	wzktest222222

控制台的截图如下:
在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 麒麟V10系统统一认证子系统国际化
  • 大厂linux面试题攻略四之Linux网络服务(二)
  • usb驱动描述符数据结构
  • <数据集>工程机械识别数据集<目标检测>
  • extern关键字在C语言中的作用
  • 【python】三种方式实现将2个3×5数组拼接形成6×5数组
  • 了解消息中间件TongLINK/Q
  • 实验5-1 使用函数计算两点间的距离
  • 以西门子winCC为代表的组态界面,还是有很大提升空间的。
  • 【C++】文件IO流
  • 涨点发论文神器:即插即用多尺度融合模块!
  • web以及nginx
  • 【网络世界】HTTPS协议
  • 《程序猿入职必会(5) · CURD 页面细节规范 》
  • logstash 全接触
  • 【MySQL经典案例分析】 Waiting for table metadata lock
  • android高仿小视频、应用锁、3种存储库、QQ小红点动画、仿支付宝图表等源码...
  •  D - 粉碎叛乱F - 其他起义
  • ES2017异步函数现已正式可用
  • pdf文件如何在线转换为jpg图片
  • Python 基础起步 (十) 什么叫函数?
  • 包装类对象
  • 从setTimeout-setInterval看JS线程
  • 机器人定位导航技术 激光SLAM与视觉SLAM谁更胜一筹?
  • 基于OpenResty的Lua Web框架lor0.0.2预览版发布
  • 计算机在识别图像时“看到”了什么?
  • 少走弯路,给Java 1~5 年程序员的建议
  • 深度学习入门:10门免费线上课程推荐
  • 实现菜单下拉伸展折叠效果demo
  • 移动端唤起键盘时取消position:fixed定位
  • 移动互联网+智能运营体系搭建=你家有金矿啊!
  • 阿里云重庆大学大数据训练营落地分享
  • ​Python 3 新特性:类型注解
  • #### go map 底层结构 ####
  • #define
  • $forceUpdate()函数
  • (+4)2.2UML建模图
  • (C语言)深入理解指针2之野指针与传值与传址与assert断言
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (九十四)函数和二维数组
  • (一)C语言之入门:使用Visual Studio Community 2022运行hello world
  • (转)Google的Objective-C编码规范
  • (自用)learnOpenGL学习总结-高级OpenGL-抗锯齿
  • .mat 文件的加载与创建 矩阵变图像? ∈ Matlab 使用笔记
  • .net core MVC 通过 Filters 过滤器拦截请求及响应内容
  • .Net Web窗口页属性
  • .NET中使用Redis (二)
  • .vue文件怎么使用_vue调试工具vue-devtools的安装
  • /etc/sudoer文件配置简析
  • /ThinkPHP/Library/Think/Storage/Driver/File.class.php  LINE: 48
  • /usr/lib/mysql/plugin权限_给数据库增加密码策略遇到的权限问题
  • @RequestParam详解
  • [30期] 我的学习方法
  • [383] 赎金信 js
  • [Algorithm][动态规划][路径问题][不同路径][不同路径Ⅱ][珠宝的最高价值]详细讲解