当前位置: 首页 > news >正文

Kafka消费者api编写教程

1.基本属性配置

输入new Properties().var 回车

//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");

2.创建消费者

输入new KafkaConsumer<String,String>(properties).var 回车选择消费者名称

//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

3.订阅主题/分区

3.1订阅主题

   输入new ArrayList<String,String>().var 回车修改变量名为topics

        //创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);

3.2订阅分区

    输入new ArrayList<TopicPartition>().var 回车选择变量名为topicsPartitions

4.消费数据

//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for回车,进行对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}

5.运行MyConsumer,通过生产者api发送消息

输出台上可以看到输出的都是订阅的主题/分区的信息

6.完整代码

package com.ljr.kafka.replay;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class MyConsumer {public static void main(String[] args) {//创建属性Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node1:9092,node2:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//指定消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"KK");//创建消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);/*//订阅主题//创建一个数组列表变量接收topics值ArrayList<String> topics = new ArrayList<>();//指定要订阅的主题topics.add("customers");//订阅主题kafkaConsumer.subscribe(topics);*///订阅分区//创建一个数组列表变量接收主题分区值ArrayList<TopicPartition> topicPartitions = new ArrayList<>();//指定要订阅的分区topicPartitions.add(new TopicPartition("customers",2));//订阅分区kafkaConsumer.assign(topicPartitions);//消费数据while (true){//if (flag  == true) flag 标志位置//break;//}生产中退出循环的位置;ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));//将消费的信息输出到控制台,输入consumerRecords.for 回车 对consumerRecords循环遍历for (ConsumerRecord<String,String> consumerRecord : consumerRecords){System.out.println(consumerRecord);}}}
}

相关文章:

  • 网络安全(黑客)2024小白自学必看
  • HttpClient4使用连接池
  • 从零手写实现 nginx-20-placeholder 占位符 $
  • 谈谈微服务之间的授权方案
  • chrome 您的连接不是私密连接
  • 一条sql的执行流程
  • doc 和 docx 文件的区别
  • 基于YOLOv8的行人检测项目的实现
  • 2024 年 5 月区块链游戏研报:市值增长、玩家参与变迁、迷你游戏兴起
  • WPF界面设计
  • 夹层辊能否解决智能测径仪量程不足的问题?
  • Vulnhub-DC-3
  • MAC系统下Xcode连接iOS真机实现iOS App自动化测试(上)
  • 如何在 Windows 上安装 MySQL(保姆级教程2024版)
  • 404 页面代码
  • [NodeJS] 关于Buffer
  • 2017前端实习生面试总结
  • 4个实用的微服务测试策略
  • codis proxy处理流程
  • Git 使用集
  • Hibernate【inverse和cascade属性】知识要点
  • interface和setter,getter
  • miniui datagrid 的客户端分页解决方案 - CS结合
  • Spring-boot 启动时碰到的错误
  • vue-cli在webpack的配置文件探究
  • 百度小程序遇到的问题
  • 仿天猫超市收藏抛物线动画工具库
  • 开发基于以太坊智能合约的DApp
  • 爬虫进阶 -- 神级程序员:让你的爬虫就像人类的用户行为!
  • 删除表内多余的重复数据
  • 微信小程序设置上一页数据
  • 小试R空间处理新库sf
  • 携程小程序初体验
  • 你对linux中grep命令知道多少?
  • 【运维趟坑回忆录 开篇】初入初创, 一脸懵
  • Android开发者必备:推荐一款助力开发的开源APP
  • 阿里云ACE认证之理解CDN技术
  • 没有任何编程基础可以直接学习python语言吗?学会后能够做什么? ...
  • ‌内网穿透技术‌总结
  • # 数据结构
  • ## 临床数据 两两比较 加显著性boxplot加显著性
  • #pragam once 和 #ifndef 预编译头
  • (01)ORB-SLAM2源码无死角解析-(56) 闭环线程→计算Sim3:理论推导(1)求解s,t
  • (4) openssl rsa/pkey(查看私钥、从私钥中提取公钥、查看公钥)
  • (NO.00004)iOS实现打砖块游戏(十二):伸缩自如,我是如意金箍棒(上)!
  • (pojstep1.3.1)1017(构造法模拟)
  • (Redis使用系列) Springboot 实现Redis消息的订阅与分布 四
  • (八)Flink Join 连接
  • (二十一)devops持续集成开发——使用jenkins的Docker Pipeline插件完成docker项目的pipeline流水线发布
  • (附源码)springboot高校宿舍交电费系统 毕业设计031552
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (附源码)ssm经济信息门户网站 毕业设计 141634
  • (剑指Offer)面试题41:和为s的连续正数序列
  • (三)Honghu Cloud云架构一定时调度平台
  • (原創) 是否该学PetShop将Model和BLL分开? (.NET) (N-Tier) (PetShop) (OO)