当前位置: 首页 > news >正文

python连接kafka生产者发送消息

通过pip install kafka-python安装第三方工具
再导入相应的方法就可以连接kafka进行消息发送了。

from kafka import KafkaProducer, KafkaConsumer
import jsonproducer = KafkaProducer(bootstrap_servers=['xxx.xxx.xxx.xxx:9092','xxx.xxx.xxx.xxx:9092'],security_protocol='SASL_PLAINTEXT',sasl_mechanism='SCRAM-SHA-512',sasl_plain_username = '鉴权账户',sasl_plain_password = '鉴权password',value_serializer = lambda m: json.dumps(m).encode('ascii'))
message = {"test":"test"}
producer.send('topicname', value = message)
producer.flush()

同时发送msg和key时,有时会报错key_bytes不属于字节类型,于是就手动把json内容改成字节型,再发送

from kafka import KafkaProducer, KafkaConsumer
import jsonproducer = KafkaProducer(bootstrap_servers=['xxx.xxx.xxx.xxx:9092','xxx.xxx.xxx.xxx:9092'],security_protocol='SASL_PLAINTEXT',sasl_mechanism='SCRAM-SHA-512',sasl_plain_username = '鉴权账户',sasl_plain_password = '鉴权password')
message = {"test":"test"}
k = {"test":"test"}msg_bytes =  json.dumps(message).encode('ascii')
k_bytes = json.dumps(k).encode('ascii')producer.send('topicname', key = k_bytes, value = msg_bytes)
producer.flush()

遇到过send发送消息超过60000ms的错误,这个问题需要加上鉴权,并且确认topic是正确的。我是加了鉴权,即用账户和密码,并且改正了topic之后,就发送成功了。

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • Memcached vs Redis——Java项目缓存选择
  • 数据结构-C语言-排序(2)
  • excel系列(二) - 利用 easypoi 快速实现 excel 文件导入导出
  • QQ频道导航退出
  • CV09_深度学习模块之间的缝合教学(4)--调参
  • 自定义 Java ClassLoader:深入探索
  • 13 IP层协议-网际控制报文协议ICMP
  • 人工智能算法工程师(中级)课程13-神经网络的优化与设计之梯度问题及优化与代码详解
  • 【正点原子i.MX93开发板试用连载体验】录音小程序采集语料
  • C++客户端Qt开发——常用控件(多元素控件)
  • 数据库管理1
  • 【Linux】centos7安装PHP7.4报错:libzip版本过低
  • 计算机网络入门
  • Ubuntu 磁盘扩容
  • PHP全功能微信投票迷你平台系统小程序源码
  • 分享一款快速APP功能测试工具
  • 《用数据讲故事》作者Cole N. Knaflic:消除一切无效的图表
  • 2018一半小结一波
  • CentOS7简单部署NFS
  • CentOS学习笔记 - 12. Nginx搭建Centos7.5远程repo
  • create-react-app项目添加less配置
  • CSS3 变换
  • Docker容器管理
  • HTTP请求重发
  • JavaScript 奇技淫巧
  • JavaScript对象详解
  • JSDuck 与 AngularJS 融合技巧
  • Python_OOP
  • react-core-image-upload 一款轻量级图片上传裁剪插件
  • spring + angular 实现导出excel
  • SpringCloud(第 039 篇)链接Mysql数据库,通过JpaRepository编写数据库访问
  • tab.js分享及浏览器兼容性问题汇总
  • Traffic-Sign Detection and Classification in the Wild 论文笔记
  • 系统认识JavaScript正则表达式
  • 新版博客前端前瞻
  • 由插件封装引出的一丢丢思考
  • 原生js练习题---第五课
  • 自制字幕遮挡器
  • [Shell 脚本] 备份网站文件至OSS服务(纯shell脚本无sdk) ...
  • # Apache SeaTunnel 究竟是什么?
  • #android不同版本废弃api,新api。
  • (附源码)php新闻发布平台 毕业设计 141646
  • (附源码)基于ssm的模具配件账单管理系统 毕业设计 081848
  • (个人笔记质量不佳)SQL 左连接、右连接、内连接的区别
  • (解决办法)ASP.NET导出Excel,打开时提示“您尝试打开文件'XXX.xls'的格式与文件扩展名指定文件不一致
  • (企业 / 公司项目)前端使用pingyin-pro将汉字转成拼音
  • (十七)Flask之大型项目目录结构示例【二扣蓝图】
  • .net6解除文件上传限制。Multipart body length limit 16384 exceeded
  • .NET性能优化(文摘)
  • .NET应用UI框架DevExpress XAF v24.1 - 可用性进一步增强
  • .vue文件怎么使用_vue调试工具vue-devtools的安装
  • .w文件怎么转成html文件,使用pandoc进行Word与Markdown文件转化
  • [ solr入门 ] - 利用solrJ进行检索
  • [ 云计算 | AWS 实践 ] Java 如何重命名 Amazon S3 中的文件和文件夹
  • [000-01-030].Zookeeper学习大纲