当前位置: 首页 > news >正文

Kafka基础入门-代码实操

   Kafka是基于发布/订阅模式的消息队列,消息的生产和消费都需要指定主题,因此,我们想要实现消息的传递,第一步必选是创建一个主题(Topic)。下面我们看下在命令行和代码中都是如何创建主题和实现消息的传递的。

使用命令行操作Kafka

使用命令行操作主题

  • 使用kafka-topics.sh脚本来实现对Topic的操作
sh kafka-topics.sh

   执行命令之后,我们可以找到到下面这行提示,REQUIRED代表必须的,就是说我们想要实现对Kafak的操作必须要带有这个参数,表示我们要连接的Kafka具体服务。

--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  connect to>                              to.   

   接下来,就让我们创建一个主题吧。

# --bootstrap-server  用于指定我们连接的Kafka服务地址,9092是默认端口号  
# --topic  指定要操作的Topic名称  
# --create 表示本次是要创建一个主题  
sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --create
# 执行结果
Created topic test.

   查看下我们的主题是否创建成功

sh kafka-topics.sh --bootstrap-server localhost:9092 --list    
# 执行结果
test

   查看某一个主题的详细信息

sh kafka-topics.sh --bootstrap-server localhost:9092 --topic test  --describe  
# 执行结果
Topic: test	TopicId: ehyjS3R3Saq8Cx2V1x0p7g	PartitionCount: 1	ReplicationFactor: 1	Configs: segment.bytes=1073741824Topic: test	Partition: 0	Leader: 1	Replicas: 1	Isr: 1

使用命令行消费数据

  • 我们通过kafka-console-consumer.sh来生产消息。
 sh kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test#输出
hello kafka

使用命令行生产数据

  • 我们通过kafka-console-consumer.sh来生产消息。
sh kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test  
# 输入
>hello kafka

   想了解如何启动Kafka,可以看这篇文章《Kafka基础入门》。

使用代码操作Kafka

   添加依赖包

 <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.7.1</version></dependency></dependencies>

生产者代码

        // 创建配置对象Map<String,Object> configMap = new HashMap<>();configMap.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");// 对生产的数据的K,V 进行序列化的操作configMap.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());configMap.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSelection.class.getName());// 创建生产者对象//      生产者需要设定泛型:数据类型的约束KafkaProducer<String,String> producer = new KafkaProducer<String,String>(configMap);// 创建数据//      构建数据时,需要传递三个参数//          第一个参数表示主题名称,主题不存在时会自动创建//          第二个参数表示数据的Key//          第二个参数表示数据的ValueProducerRecord<String,String> record = new ProducerRecord<String,String>("test","key","value");// 通过生产者对象,将数据发送到Kafkaproducer.send(record);//关闭生产者对象producer.close();

消费者代码

        // 创建配置对象Map<String,Object> configMap = new HashMap<>();configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"lcoalhost:9092");// 对生产的数据的K,V 进行反序列化的操作configMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());configMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSelection.class.getName());configMap.put(ConsumerConfig.GROUP_ID_CONFIG,"com.kafka.test");// 创建消费者对象KafkaConsumer<String,String> kafkaConsumer = new KafkaConsumer<String,String>(configMap);//订阅主题kafkaConsumer.subscribe(Collections.singleton("test"));// 从Kafka中获取数据//      消费者从Kafka中拉取数据ConsumerRecords<String, String> datas = kafkaConsumer.poll(1000);datas.forEach(data ->{System.out.println(data);});// 关闭消费者对象kafkaConsumer.close();

在这里插入图片描述
点击下方名片,关注『编程青衫客』
随时随地获取最新好文章!

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 易懂的吉文斯(Givens)变换(一)
  • 如何使用Gunicorn配置SSL/TLS加密Web服务
  • 序列化与反序列化及不同序列化方式的性能对比
  • 第四章 Redis(2023版本IDEA)
  • SVN 分支管理深入解析
  • 机器人三定律及伦理分析
  • 通过 PPPOE 将 linux 服务器作为本地局域网 IPv4 外网网关
  • Zookeeper-数据结构
  • 优化Cocos Creator 包体体积
  • IDEA启动Web项目总是提示端口占用
  • VsCode远程ssh连接失败:Could not establish connection to XXX
  • Vue3学习体验(一)
  • Reinforced Causal Explainer for GNN论文笔记
  • python基础语法 005 函数1-2 函数作用域
  • Linux - 基础开发工具(yum、vim、gcc、g++、make/Makefile、git)
  • 《Java编程思想》读书笔记-对象导论
  • 【跃迁之路】【641天】程序员高效学习方法论探索系列(实验阶段398-2018.11.14)...
  • Brief introduction of how to 'Call, Apply and Bind'
  • es6要点
  • Golang-长连接-状态推送
  • input实现文字超出省略号功能
  • Js实现点击查看全文(类似今日头条、知乎日报效果)
  • mysql 5.6 原生Online DDL解析
  • ReactNative开发常用的三方模块
  • Redis 中的布隆过滤器
  • Vue 动态创建 component
  • 百度贴吧爬虫node+vue baidu_tieba_crawler
  • 道格拉斯-普克 抽稀算法 附javascript实现
  • 给新手的新浪微博 SDK 集成教程【一】
  • 构造函数(constructor)与原型链(prototype)关系
  • 关于springcloud Gateway中的限流
  • 每天一个设计模式之命令模式
  • 前端面试题总结
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 如何将自己的网站分享到QQ空间,微信,微博等等
  • 携程小程序初体验
  • 正则学习笔记
  • 主流的CSS水平和垂直居中技术大全
  • Linux权限管理(week1_day5)--技术流ken
  • scrapy中间件源码分析及常用中间件大全
  • 如何在招聘中考核.NET架构师
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • ​queue --- 一个同步的队列类​
  • ​油烟净化器电源安全,保障健康餐饮生活
  • #我与Java虚拟机的故事#连载08:书读百遍其义自见
  • #我与Java虚拟机的故事#连载17:我的Java技术水平有了一个本质的提升
  • #在 README.md 中生成项目目录结构
  • (7) cmake 编译C++程序(二)
  • (day 2)JavaScript学习笔记(基础之变量、常量和注释)
  • (leetcode学习)236. 二叉树的最近公共祖先
  • (rabbitmq的高级特性)消息可靠性
  • (Redis使用系列) Springboot 使用redis实现接口Api限流 十
  • (大众金融)SQL server面试题(1)-总销售量最少的3个型号的车及其总销售量
  • (二)Linux——Linux常用指令
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)