当前位置: 首页 > news >正文

Kafka问题排查(消费者自动关闭)

问题描述:
           在消费端能够正常消费到Kafka数据并成功生产到producer topic 中,当将kafka的一台机器关机之后,正常情况下应该是 消费端是不受影响的。因为有还有两台的负载机器。问题就是一台机器停止运行之后,消费端酒 shutdown  而无法重新starting
解决方式 : 
在如下代码中。
 1      public void run(){
 2              try{
 3                   System. out.println( "Consumer....");
 4                   Map<String,Integer> topicCountMap = new HashMap<String,Integer>();
 5                    topicCountMap.put( topic, partitionNum);
 6                   Map<String,List<KafkaStream< byte[], byte[]>>> consumerMap = consumer.createMessageStreams( topicCountMap);
 7                   List<KafkaStream< byte[], byte[]>> partitions = consumerMap.get( topic);
 8 
 9                    threadPool = Executors. newCachedThreadPool();
10                    for(KafkaStream< byte[], byte[]> partition : partitions){
11                          threadPool.execute( new MessageFetcher(partition,producer ));
12                   }
13             } catch(Exception ex){
14                    logger.info( "KafkaConsumer-> Run -> ErrInfo : " +ex.getMessage());
15                   close();
16             }
17       }

有一个 partitionNum,在代码中的可配置值为 private int partitionNum = 3;  

把partitionNum 改为 1 即可解决此问题。

问题跟踪源码分析:

partitionNUm 改为 1 , 此处的Num 为ThreadNum ,因为kafka内部实现中,都为多线程, partition为1时,此时有一个backingQueue1,三个fetch thread 线程,该topic分布在几个node上就有几个 fetch thread 每个fetch thread 会于kafka broker建立一个连接,3个fetch thread线程去拉去消息数据,最终防盗blockingQueue中,等到consumer thread来消费。

转载于:https://www.cnblogs.com/DeepLearing/p/5641454.html

相关文章:

  • 手把手之STM32GPIO
  • iOS 读取plist 方法
  • json恶补
  • mybatis实战教程(mybatis in action)之三:实现数据的增删改查
  • 在代码中修改TextView的DrawableRight图片
  • MongoDB-基础-limit-skip-sort
  • Python 安装模块 模块搜索路径
  • Android第三十天
  • nbsp
  • [改善Java代码]非稳定排序推荐使用List
  • Round 0: Regionals 2010 :: NEERC Eastern Subregional
  • 远程桌面卡
  • 二、ssh 协议:SSH 验证协议 —— 公钥认证
  • 安装程序时出现2502 2503错误解决方法
  • html5配合css3实现带提示文字的输入框(摆脱js)
  • 10个最佳ES6特性 ES7与ES8的特性
  • axios请求、和返回数据拦截,统一请求报错提示_012
  • CSS3 变换
  • ES学习笔记(10)--ES6中的函数和数组补漏
  • JavaScript的使用你知道几种?(上)
  • Mac转Windows的拯救指南
  • magento2项目上线注意事项
  • MySQL-事务管理(基础)
  • NSTimer学习笔记
  • Service Worker
  • vue数据传递--我有特殊的实现技巧
  • 半理解系列--Promise的进化史
  • 码农张的Bug人生 - 见面之礼
  • 爬虫模拟登陆 SegmentFault
  • 如何使用 JavaScript 解析 URL
  • 扫描识别控件Dynamic Web TWAIN v12.2发布,改进SSL证书
  • 提升用户体验的利器——使用Vue-Occupy实现占位效果
  • 详解移动APP与web APP的区别
  • mysql 慢查询分析工具:pt-query-digest 在mac 上的安装使用 ...
  • # Pytorch 中可以直接调用的Loss Functions总结:
  • #!/usr/bin/python与#!/usr/bin/env python的区别
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • #etcd#安装时出错
  • #pragma pack(1)
  • (c语言版)滑动窗口 给定一个字符串,只包含字母和数字,按要求找出字符串中的最长(连续)子串的长度
  • (Java实习生)每日10道面试题打卡——JavaWeb篇
  • (超详细)语音信号处理之特征提取
  • (接口自动化)Python3操作MySQL数据库
  • (四)鸿鹄云架构一服务注册中心
  • (转)Mysql的优化设置
  • (转)setTimeout 和 setInterval 的区别
  • .bat批处理(四):路径相关%cd%和%~dp0的区别
  • .net 获取url的方法
  • .NET/C# 使用 #if 和 Conditional 特性来按条件编译代码的不同原理和适用场景
  • .NET轻量级ORM组件Dapper葵花宝典
  • .NET中的Exception处理(C#)
  • .Net组件程序设计之线程、并发管理(一)
  • .vollhavhelp-V-XXXXXXXX勒索病毒的最新威胁:如何恢复您的数据?
  • /usr/local/nginx/logs/nginx.pid failed (2: No such file or directory)
  • [ IO.File ] FileSystemWatcher