当前位置: 首页 > news >正文

kafka集群发送消息报错

2019独角兽企业重金招聘Python工程师标准>>> hot3.png

因为logstash采集的日志要发往kafka做一个队列机制,搭建完kafka集群后发送消息出现问题

ERROR fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:slave4,port:9092)] failed (kafka.utils.Utils$)
kafka.common.KafkaException: fetching topic metadata for topics [Set(order)] from broker [ArrayBuffer(id:0,host:slave4,port:9092)] failed
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
        at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
        at kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:78)
        at kafka.utils.Utils$.swallow(Utils.scala:167)
        at kafka.utils.Logging$class.swallowError(Logging.scala:106)
        at kafka.utils.Utils$.swallowError(Utils.scala:46)
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:78)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
Caused by: java.net.ConnectException: Connection refused
        at sun.nio.ch.Net.connect0(Native Method)
        at sun.nio.ch.Net.connect(Net.java:484)
        at sun.nio.ch.Net.connect(Net.java:476)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        ... 12 more

ERROR Error in handling batch of 1 events (kafka.producer.async.ProducerSendThread)
kafka.common.FailedToSendMessageException: Failed to send messages after 3 tries.
        at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
        at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
        at kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
        at scala.collection.immutable.Stream.foreach(Stream.scala:547)
        at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
        at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)

因为看到 Caused by: java.net.ConnectException: Connection refused ,所以查看 selinux

vim /etc/selinux/config

# This file controls the state of SELinux on the system.
# SELINUX= can take one of these three values:
#     enforcing - SELinux security policy is enforced.
#     permissive - SELinux prints warnings instead of enforcing.
#     disabled - No SELinux policy is loaded.
SELINUX=disabled
# SELINUXTYPE= can take one of these two values:
#     targeted - Targeted processes are protected,
#     mls - Multi Level Security protection.
SELINUXTYPE=targeted

强制关闭 selinux

setenforce 0

发送消息还是报一样的错

查看zookeeper

[zk: master:2181(CONNECTED) 18] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1465262832441","host":"localhost","version":1,"port":9092}
cZxid = 0x70000005e
ctime = Tue Jun 07 09:27:12 CST 2016
mZxid = 0x70000005e
mtime = Tue Jun 07 09:27:12 CST 2016
pZxid = 0x70000005e
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x255213e787e0002
dataLength = 86
numChildren = 0

host是localhost,而我的kafka集群是slave4、5上,zk在master,slave2、3上,所以分别修改 slave4、5  上的server.properties的 host.name=slave4, host.name=slave5

重新启动kafka,查看zk

[zk: master:2181(CONNECTED) 27] get /brokers/ids/0
{"jmx_port":-1,"timestamp":"1465266328389","host":"slave4","version":1,"port":9092}
cZxid = 0x70000008a
ctime = Tue Jun 07 10:25:27 CST 2016
mZxid = 0x70000008a
mtime = Tue Jun 07 10:25:27 CST 2016
pZxid = 0x70000008a
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x355213e7be80003
dataLength = 83
numChildren = 0

0上的host变成了slave4

在发送消息成功,接收消息成功

 

转载于:https://my.oschina.net/zlhblogs/blog/688407

相关文章:

  • apache2 启用php7.0
  • Android 仿网易新闻v3.5:上下滑动的引导页
  • C语言基础
  • angularjs与require的集成摘抄
  • rsync远程数据备份配置之再次总结
  • HP DL580 G7设置IPMI
  • 面试题与答案
  • Gluon公布完整的Java 9 Mobile创新举措
  • Linux运维(数据库专题)面试题
  • 生产环境提升ssh安全的10种方式
  • 重提敏捷已死
  • AppFabric 版本区分
  • android之SQLlite操作
  • 文件查找和压缩
  • HDU4813 Hard Code
  • 【css3】浏览器内核及其兼容性
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • HTTP传输编码增加了传输量,只为解决这一个问题 | 实用 HTTP
  • HTTP请求重发
  • Iterator 和 for...of 循环
  • laravel5.5 视图共享数据
  • Linux下的乱码问题
  • PHP 程序员也能做的 Java 开发 30分钟使用 netty 轻松打造一个高性能 websocket 服务...
  • php中curl和soap方式请求服务超时问题
  • SOFAMosn配置模型
  • Vim Clutch | 面向脚踏板编程……
  • vue学习系列(二)vue-cli
  • 对象管理器(defineProperty)学习笔记
  • 强力优化Rancher k8s中国区的使用体验
  • 如何选择开源的机器学习框架?
  • 因为阿里,他们成了“杭漂”
  • kubernetes资源对象--ingress
  • $GOPATH/go.mod exists but should not goland
  • ( 用例图)定义了系统的功能需求,它是从系统的外部看系统功能,并不描述系统内部对功能的具体实现
  • (规划)24届春招和25届暑假实习路线准备规划
  • (四)库存超卖案例实战——优化redis分布式锁
  • (转)Sublime Text3配置Lua运行环境
  • (转)德国人的记事本
  • (转)母版页和相对路径
  • (转)自己动手搭建Nginx+memcache+xdebug+php运行环境绿色版 For windows版
  • .[backups@airmail.cc].faust勒索病毒的最新威胁:如何恢复您的数据?
  • .NET CF命令行调试器MDbg入门(一)
  • .NET Core 成都线下面基会拉开序幕
  • .NET 中 GetHashCode 的哈希值有多大概率会相同(哈希碰撞)
  • .net2005怎么读string形的xml,不是xml文件。
  • .NET委托:一个关于C#的睡前故事
  • .NET业务框架的构建
  • [ C++ ] STL---stack与queue
  • [ CTF ] WriteUp-2022年春秋杯网络安全联赛-冬季赛
  • [20180224]expdp query 写法问题.txt
  • [2024] 十大免费电脑数据恢复软件——轻松恢复电脑上已删除文件
  • [ACL2022] Text Smoothing: 一种在文本分类任务上的数据增强方法
  • [AutoSar]BSW_Com07 CAN报文接收流程的函数调用
  • [BUUCTF]-PWN:[极客大挑战 2019]Not Bad解析
  • [C语言]——柔性数组