当前位置: 首页 > news >正文

golang kafka sarama 源码解析

  • 消费者组重平衡

github.com/!shopify/sarama@v1.27.2/consumer_group.go

func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {// 获取broker组协调器coordinator, err := c.client.Coordinator(c.groupID)if err != nil {if retries <= 0 {return nil, err}return c.retryNewSession(ctx, topics, handler, retries, true)}// 申请加入组// Join consumer groupjoin, err := c.joinGroupRequest(coordinator, topics)if err != nil {_ = coordinator.Close()return nil, err}switch join.Err {case ErrNoError:c.memberID = join.MemberIdcase ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, true)// 已经在重平衡期间case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, join.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, join.Err}// 消费者组中的一个消费者作为leader,进行分区方案分配// Prepare distribution plan if we joined as the leadervar plan BalanceStrategyPlanif join.LeaderId == join.MemberId {members, err := join.GetMembers()if err != nil {return nil, err}// 分配分区plan, err = c.balance(members)if err != nil {return nil, err}}// 同步给kafka,只有 leader会带上分区方案// Sync consumer groupgroupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)if err != nil {_ = coordinator.Close()return nil, err}switch groupRequest.Err {case ErrNoError:case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediatelyc.memberID = ""return c.newSession(ctx, topics, handler, retries)case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refreshif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, true)case ErrRebalanceInProgress: // retry after backoffif retries <= 0 {return nil, groupRequest.Err}return c.retryNewSession(ctx, topics, handler, retries, false)default:return nil, groupRequest.Err}// Retrieve and sort claimsvar claims map[string][]int32 // topic->partions// 如果有可消费的分区if len(groupRequest.MemberAssignment) > 0 {members, err := groupRequest.GetMemberAssignment()if err != nil {return nil, err}claims = members.Topicsc.userData = members.UserDatafor _, partitions := range claims {sort.Sort(int32Slice(partitions))}}return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
}
  • 消费者拉取消息
func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {bc := &brokerConsumer{consumer:         c,broker:           broker,input:            make(chan *partitionConsumer),newSubscriptions: make(chan []*partitionConsumer),wait:             make(chan none),subscriptions:    make(map[*partitionConsumer]none),refs:             0,}go withRecover(bc.subscriptionManager)go withRecover(bc.subscriptionConsumer)return bc
}

相关文章:

  • 基于SpringBoot后端实现连接MySQL数据库并存贮数据
  • Linux 在线yum安装: PostgreSQL 15.6数据库
  • 【Leetcode】单链表常见题
  • 蓝桥杯练习题总结(三)线性dp题(摆花、数字三角形加强版)
  • Gitlab的流水线任务【实现每小时自动测试 dev分支的更新】
  • 大数据Hadoop生态圈体系视频课程
  • uniapp自定义导航栏左中右内容和图标,以及点击事件
  • 【Docker】Docker资源(创建容器)CPU/内存/磁盘IO/GPU限制与分配教程
  • IOS面试题编程机制 6-10
  • 【Java程序设计】【C00373】基于(JavaWeb)Springboot的社区疫情返乡管控系统(有论文)
  • 【阅读笔记】《一个聪明的投资者》
  • LODGE 学习笔记
  • volatile,synchronized,reentranlock,CAS详解
  • go env 命令详解
  • TouchGFX之Button
  • [笔记] php常见简单功能及函数
  • 【前端学习】-粗谈选择器
  • 4月23日世界读书日 网络营销论坛推荐《正在爆发的营销革命》
  • MD5加密原理解析及OC版原理实现
  • OpenStack安装流程(juno版)- 添加网络服务(neutron)- controller节点
  • OSS Web直传 (文件图片)
  • python学习笔记 - ThreadLocal
  • react-native 安卓真机环境搭建
  • spring学习第二天
  • SwizzleMethod 黑魔法
  • 关于 Cirru Editor 存储格式
  • 聊聊flink的TableFactory
  • 探索 JS 中的模块化
  • 提升用户体验的利器——使用Vue-Occupy实现占位效果
  • 通过获取异步加载JS文件进度实现一个canvas环形loading图
  • 一个6年java程序员的工作感悟,写给还在迷茫的你
  • 异常机制详解
  • d²y/dx²; 偏导数问题 请问f1 f2是什么意思
  • k8s使用glusterfs实现动态持久化存储
  • 东超科技获得千万级Pre-A轮融资,投资方为中科创星 ...
  • #经典论文 异质山坡的物理模型 2 有效导水率
  • (cljs/run-at (JSVM. :browser) 搭建刚好可用的开发环境!)
  • (附源码)springboot家庭财务分析系统 毕业设计641323
  • (附源码)springboot助农电商系统 毕业设计 081919
  • (每日持续更新)信息系统项目管理(第四版)(高级项目管理)考试重点整理第3章 信息系统治理(一)
  • (三)Pytorch快速搭建卷积神经网络模型实现手写数字识别(代码+详细注解)
  • (三)终结任务
  • (实战篇)如何缓存数据
  • (四)docker:为mysql和java jar运行环境创建同一网络,容器互联
  • ***监测系统的构建(chkrootkit )
  • .NET CLR基本术语
  • .NET Core 将实体类转换为 SQL(ORM 映射)
  • .Net(C#)自定义WinForm控件之小结篇
  • .NET使用HttpClient以multipart/form-data形式post上传文件及其相关参数
  • .pyc文件是什么?
  • @angular/cli项目构建--Dynamic.Form
  • @我的前任是个极品 微博分析
  • [ 云计算 | AWS 实践 ] Java 如何重命名 Amazon S3 中的文件和文件夹
  • [1525]字符统计2 (哈希)SDUT
  • [2023-年度总结]凡是过往,皆为序章