当前位置: 首页 > news >正文

Python连接Kafka收发数据等操作

目录

一、Kafka

二、发送端(生产者)

三、接收端(消费者)

四、其他操作


一、Kafka

Apache Kafka 是一个开源流处理平台,由 LinkedIn 开发,并于 2011 年成为 Apache 软件基金会的一部分。Kafka 广泛用于构建实时的数据流和流式处理应用程序,它以高吞吐量、可扩展性和容错性著称。

kafka-python 是一个用 Python 编写的 Apache Kafka 客户端库。

安装命令如下:

pip install kafka-python

二、发送端(生产者)

自动创建test主题,并每隔一秒发送一条数据,示例代码如下:

from kafka import KafkaProducer
import json
import time# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaProducer实例
producer = KafkaProducer(bootstrap_servers=bootstrap_servers)# 发送消息的函数
def send_message(topic, message):# 将消息转换为字节producer.send(topic, json.dumps(message).encode('utf-8'))producer.flush()if __name__ == '__main__':# 创建'test'主题topic = 'test'# 发送消息i = 1while True:message = {'num': i, 'msg': f'Hello Kafka {i}'}send_message(topic, message)i += 1time.sleep(1)

三、接收端(消费者)

代码如下:

from kafka import KafkaConsumer
import json# Kafka服务器地址
bootstrap_servers = ['localhost:9092']# 创建KafkaConsumer实例
consumer = KafkaConsumer('test',bootstrap_servers=bootstrap_servers,auto_offset_reset='latest',  # 从最新的消息开始消费# auto_offset_reset='earliest',  # 从最早的offset开始消费enable_auto_commit=True,  # 自动提交offsetgroup_id='my-group'  # 消费者组ID
)# 消费消息
for message in consumer:# 将接收到的消息解码并转换为字典message = json.loads(message.value.decode('utf-8'))print(f"Received message: {message}")

消费者参数如下:

1、auto_offset_reset
该参数指定了当Kafka中没有初始偏移量或当前偏移量在服务器上不再存在时(例如数据被删除了),消费者应从何处开始读取数据。
可选值:
earliest:从最早的记录开始消费,即从分区日志的开始处开始。
latest:从最新的记录开始消费,即从分区日志的末尾开始。(默认)
none:如果没有为消费者指定初始偏移量,就抛出一个异常。

2、enable_auto_commit

该参数指定了消费者是否周期性地提交它所消费的偏移量。自动提交偏移量可以简化消费者的使用,但可能有重复消费或数据丢失的风险。禁用自动提交可以更精确地控制偏移量的提交时机,通常在确保消息处理成功后才提交偏移量。
可选值:
true:自动提交偏移量。(默认)
false:不自动提交偏移量,需要手动调用commitSync()或commitAsync()来提交偏移量。

3、group_id

该参数用于指定消费者所属的消费组。同一个消费组的消费者将共同消费一个主题的不同分区,而不同消费组的消费者可以独立地消费消息,互不影响。这对于实现负载均衡和故障转移很有用。
类型:字符串(必须指定)

四、其他操作

list_topics():获取主题元数据。

create_topics():创建新主题。

delete_topics():删除主题。

from kafka.admin import KafkaAdminClient, NewTopic# 获取主题元数据
admin_client = KafkaAdminClient(bootstrap_servers='localhost:9092', client_id='test')
topics = admin_client.list_topics()
print(topics)# 创建主题
new_topic = NewTopic(name="test-topic", num_partitions=3, replication_factor=1)
admin_client.create_topics(new_topics=[new_topic], validate_only=False)# 删除主题
admin_client.delete_topics(topics=['test-topic'])

相关文章:

  • MySQl查询分析工具 EXPLAIN ANALYZE
  • SpringSecurity -- 入门使用
  • 在某服务中,两方法递归调用导致堆栈溢出
  • 【第十六章:Sentosa_DSML社区版-机器学习之生存分析】
  • “投其所招”-智能投标领军者丨OPENAIGC开发者大赛高校组AI创作力奖|
  • 基于RepLKNet31B模型在RML201610a数据集上的调制识别【代码+数据集+python环境+GUI系统】
  • Rust 全局变量的最佳实践 lazy_static/OnceLock/Mutex/RwLock
  • # linux从入门到精通(三)
  • UDP通信
  • [数据结构] 二叉树题目 (二)
  • 阿博图书馆管理系统:SpringBoot技术应用
  • c语言中的杨氏矩阵的介绍以及元素查找的方法
  • django drf 分页器
  • MP4 格式:前世今生与技术解析
  • HarmonyOS鸿蒙系统开发应用程序,免费开源DevEco Studio开发工具
  • @jsonView过滤属性
  • docker容器内的网络抓包
  • Essential Studio for ASP.NET Web Forms 2017 v2,新增自定义树形网格工具栏
  • gulp 教程
  • iOS | NSProxy
  • Linux后台研发超实用命令总结
  • PHP 的 SAPI 是个什么东西
  • quasar-framework cnodejs社区
  • Spring技术内幕笔记(2):Spring MVC 与 Web
  • Terraform入门 - 3. 变更基础设施
  • 从setTimeout-setInterval看JS线程
  • 二维平面内的碰撞检测【一】
  • 和 || 运算
  • 猫头鹰的深夜翻译:JDK9 NotNullOrElse方法
  • 容器服务kubernetes弹性伸缩高级用法
  • 使用阿里云发布分布式网站,开发时候应该注意什么?
  • 正则表达式小结
  • 仓管云——企业云erp功能有哪些?
  • ​MySQL主从复制一致性检测
  • ​Spring Boot 分片上传文件
  • # 睡眠3秒_床上这样睡觉的人,睡眠质量多半不好
  • (c语言+数据结构链表)项目:贪吃蛇
  • (done) 声音信号处理基础知识(2) (重点知识:pitch)(Sound Waveforms)
  • (Java数据结构)ArrayList
  • (搬运以学习)flask 上下文的实现
  • (笔试题)分解质因式
  • (附源码)springboot家庭装修管理系统 毕业设计 613205
  • (接上一篇)前端弄一个变量实现点击次数在前端页面实时更新
  • (十三)MipMap
  • (五)Python 垃圾回收机制
  • (转载)Linux网络编程入门
  • (自适应手机端)行业协会机构网站模板
  • ./indexer: error while loading shared libraries: libmysqlclient.so.18: cannot open shared object fil
  • .bat文件调用java类的main方法
  • .Net 6.0 处理跨域的方式
  • .NET CF命令行调试器MDbg入门(三) 进程控制
  • .net core 连接数据库,通过数据库生成Modell
  • .Net mvc总结
  • .net 中viewstate的原理和使用
  • .NET/C# 如何获取当前进程的 CPU 和内存占用?如何获取全局 CPU 和内存占用?