当前位置: 首页 > news >正文

深入浅出分析kafka客户端程序设计 ----- 消费者篇----万字总结

1. Kafka 消费者的逻辑

  • 配置消费者客户端参数。
  • 创建相应的消费者实例。
  • 订阅主题。
  • 拉取消息并消费;
  • 提交消息位移;
  • 关闭消费者实例;

2 Kafka 的C++ API

2.1 RdKafka::Conf

见生成者实现文章。

2.2 RdKafka::Event

见生成者实现文章。

2.3 RdKafka::EventCb

见生成者实现文章。

2.4 RdKafka::TopicPartition

static TopicPartition * create(const std::string &topic, int partition);
//创建一个TopicPartition对象。static TopicPartition *create (const std::string &topic, int partition,int64_t offset);
//创建TopicPartition对象。static void destroy (std::vector<TopicPartition*> &partitions);
//销毁所有TopicPartition对象。const std::string & topic () const;
//返回Topic名称。int partition ();
//返回分区号。int64_t offset();
//返回位移。void set_offset(int64_t offset);
//设置位移。ErrorCode err();
//返回错误码。

2.5 RdKafka::RebalanceCb

virtual void rebalance_cb(RdKafka::KafkaConsumer *consumer, RdKafka::ErrorCode err, std::vector< TopicPartition * >&partitions)=0;

用于RdKafka::KafkaConsunmer的组再平衡回调函数;注册rebalance_cb回调函数会关闭rdkafka的自动分区赋值和再分配并替换应用程序的rebalance_cb回调函数

 再平衡回调函数负责对基于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件更新rdkafka的分区分配,也能处理任意前两者错误除外其它再平衡失败错误。对于RdKafka::ERR_ASSIGN_PARTITIONS和RdKafka::ERR_REVOKE_PARTITIONS事件之外的其它再平衡失败错误,必须调用unassign()同步状态。
没有再平衡回调函数,rdkafka也能自动完成再平衡过程,但注册一个再平衡回调函数可以使应用程序在执行其它操作时拥有更大的灵活性,例如从指定位置获取位移或手动提交位移。


C++封装API:

class ConsumerRebalanceCb : public RdKafka::RebalanceCb
{
private:static void printTopicPartition (const std::vector<RdKafka::TopicPartition*>&partitions)        // 打印当前获取的分区{for (unsigned int i = 0 ; i < partitions.size() ; i++)std::cerr << partitions[i]->topic() <<"[" << partitions[i]->partition() << "], ";std::cerr << "\n";}public:void rebalance_cb (RdKafka::KafkaConsumer *consumer,RdKafka::ErrorCode err,std::vector<RdKafka::TopicPartition*> &partitions){std::cerr << "RebalanceCb: " << RdKafka::err2str(err) << ": ";printTopicPartition(partitions);if (err == RdKafka::ERR__ASSIGN_PARTITIONS){consumer->assign(partitions);partition_count = (int)partitions.size();}else{consumer->unassign();partition_count = 0;}}
private:int partition_count;
};

2.6 RdKafka::Message

见生成者实现文章。

2.7 RdKafka::KafkaConsumer(核心)

KafkaConsumer是高级API,要求Kafka 0.9.0以上版本,当前支持range和roundrobin分区分配策略。

static KafkaConsumer * create(Conf *conf, std::string &errstr);
创建KafkaConsumer对象,conf对象必须配置Consumer要加入的消费者组。使用KafkaConsumer::close()进行关闭。ErrorCode assignment(std::vector< RdKafka::TopicPartition * > &partitions);
返回由RdKafka::KafkaConsumer::assign() 设置的当前分区。ErrorCode subscription(std::vector< std::string > &topics);
返回由RdKafka::KafkaConsumer::subscribe() 设置的当前订阅Topic。ErrorCode subscribe(const std::vector< std::string > &topics);
更新订阅Topic分区。ErrorCode unsubscribe();
将当前订阅Topic取消订阅分区。ErrorCode assign(const std::vector< TopicPartition * > &partitions);
将分配分区更新为partitions。ErrorCode unassign();
停止消费并删除当前分配的分区。Message * consume(int timeout_ms);
消费消息或获取错误事件,触发回调函数,会自动调用注册的回调函数,包括RebalanceCb、EventCb、OffsetCommitCb等。需要使用delete释放消息。应用程序必须确保consume在指定时间间隔内调用,为了执行等待调用的回调函数,即使没有消息。当RebalanceCb被注册时,在需要调用和适当处理内部Consumer同步状态时,确保consume在指定时间间隔内调用极为重要。应用程序必须禁止对KafkaConsumer对象调用poll函数。
如果RdKafka::Message::err()是ERR_NO_ERROR,则返回正常的消息;如果RdKafka::Message::err()是ERR_NO_ERRO,返回错误事件;如果RdKafka::Message::err()是ERR_TIMED_OUT,则超时。ErrorCode commitSync();
提交当前分配分区的位移,同步操作,会阻塞直到位移被提交或提交失败。如果注册了RdKafka::OffsetCommitCb回调函数,其会在KafkaConsumer::consume()函数内调用并提交位移。ErrorCode commitAsync();
异步提交位移。ErrorCode commitSync(Message *message);
基于消息对单个topic+partition对象同步提交位移。virtual ErrorCode commitSync (std::vector<TopicPartition*> &offsets) = 0;
对指定多个TopicPartition同步提交位移。ErrorCode commitAsync(Message *message);
基于消息对单个TopicPartition异步提交位移。virtual ErrorCode commitAsync (const std::vector<TopicPartition*> &offsets) = 0;
对多个TopicPartition异步提交位移。ErrorCode close();
正常关闭,会阻塞直到四个操作完成(触发避免当前分区分配的局部再平衡,停止当前赋值消费,提交位移,离开分组)virtual ConsumerGroupMetadata *groupMetadata () = 0;
返回本Consumer实例的Consumer Group的元数据。ErrorCode position (std::vector<TopicPartition*> &partitions)
获取TopicPartition对象中当前位移,会别填充TopicPartition对象的offset字段。ErrorCode seek (const TopicPartition &partition, int timeout_ms)
定位TopicPartition的Consumer到位移。timeout_ms为0,会开始Seek并立即返回;timeout_ms非0,Seek会等待timeout_ms时间。ErrorCode offsets_store (std::vector<TopicPartition*> &offsets)
为TopicPartition存储位移,位移会在auto.commit.interval.ms时提交或是被手动提交。enable.auto.offset.store属性必须设置为fasle。

3 Kafka 消费者客户端开发


3.1 必要的参数配置(bootstrap.servers)


在创建消费者的时候以下以下三个选项是必选的:

bootstrap.servers:指定 broker (kafka服务器)的地址清单,清单里不需要包含所有的 broker(kafka) 地址,生产者会从给定的 broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错。
group.id:consumer group 是 kafka 提供的可扩展且具有容错性的消费者机制。既然是一个组,那么组内必然可以有多个消费者或消费者实例(consumer instance),它们共享一个公共的 ID,即group ID。组内的所有消费者协调在一起来消费订阅主题(subscribed topics)的所有分区(partition)。
auto.offset.reset:这个参数是针对新的 groupid 中的消费者而言的,当有新 groupid 的消费者来

 

相关文章:

  • 杨志丰:OceanBase助力企业应对数据库转型深水区挑战
  • Python+requests+unittest+excel实现接口自动化测试框架
  • 在VSCode中运行Python脚本文件时如何传参
  • 用Rust刷LeetCode之66 加一
  • 2分钟带你了解什么是Vsync
  • Java爬虫攻略:应对JavaScript登录表单
  • Apache Hive(部署+SQL+FineBI构建展示)
  • Ribbon组件的负载均衡原理
  • 电脑搜不自己的手机热点,其余热点均可!
  • 采样率越高噪声越大?
  • 【redis笔记】分布式锁
  • 【Lidar】基于Python的三维点云数据转二维平面+散点图绘制
  • 2次MD5加密——用于分布式对话
  • Labelme2Yolo labelme格式的json标注转yolo格式txt
  • 【尘缘送书第五期】Java程序员:学习与使用多线程
  • #Java异常处理
  • JSDuck 与 AngularJS 融合技巧
  • SQLServer之创建显式事务
  • 初探 Vue 生命周期和钩子函数
  • 高程读书笔记 第六章 面向对象程序设计
  • 猫头鹰的深夜翻译:Java 2D Graphics, 简单的仿射变换
  • 爬虫进阶 -- 神级程序员:让你的爬虫就像人类的用户行为!
  • 文本多行溢出显示...之最后一行不到行尾的解决
  • 项目实战-Api的解决方案
  • 原生Ajax
  • 3月27日云栖精选夜读 | 从 “城市大脑”实践,瞭望未来城市源起 ...
  • UI设计初学者应该如何入门?
  • 新年再起“裁员潮”,“钢铁侠”马斯克要一举裁掉SpaceX 600余名员工 ...
  • ​520就是要宠粉,你的心头书我买单
  • ​LeetCode解法汇总2304. 网格中的最小路径代价
  • ​LeetCode解法汇总518. 零钱兑换 II
  • ​七周四次课(5月9日)iptables filter表案例、iptables nat表应用
  • # 数据结构
  • (poj1.2.1)1970(筛选法模拟)
  • (附源码)ssm高校社团管理系统 毕业设计 234162
  • (附源码)计算机毕业设计SSM疫情社区管理系统
  • (六)什么是Vite——热更新时vite、webpack做了什么
  • (使用vite搭建vue3项目(vite + vue3 + vue router + pinia + element plus))
  • .NET 5.0正式发布,有什么功能特性(翻译)
  • .net mvc actionresult 返回字符串_.NET架构师知识普及
  • .NET 分布式技术比较
  • .net 流——流的类型体系简单介绍
  • .NET/C# 编译期能确定的字符串会在字符串暂存池中不会被 GC 垃圾回收掉
  • .NET/C# 使用 ConditionalWeakTable 附加字段(CLR 版本的附加属性,也可用用来当作弱引用字典 WeakDictionary)
  • .NET应用架构设计:原则、模式与实践 目录预览
  • .net专家(张羿专栏)
  • @data注解_一枚 架构师 也不会用的Lombok注解,相见恨晚
  • [2010-8-30]
  • [20170705]lsnrctl status LISTENER_SCAN1
  • [Android开源]EasySharedPreferences:优雅的进行SharedPreferences数据存储操作
  • [AutoSar]BSW_Com02 PDU详解
  • [BZOJ 3282] Tree 【LCT】
  • [bzoj1901]: Zju2112 Dynamic Rankings
  • [LeetCode] 197. 上升的温度
  • [linux] 创建用户