当前位置: 首页 > news >正文

Python 消费Kafka手动提交 批量存入Elasticsearch

一、第三方包选择

pip install kafka,对比了kafka和pykafka,还是选择kafka,消费速度更快
pip install elasticsearch==7.12.0(ES版本)

二、创建es连接对象

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulkclass Create_ES(object):_instance = Nonedef __new__(cls, *args, **kwargs):if cls._instance is None:cls._instance = super().__new__(cls)return cls._instancedef __init__(self, hosts):try:self.es = Elasticsearch([{'host':host, 'port':9200}])except Exception as e:print('Connect ES Fail db:{} error:{}'.format(hosts, str(e)))def get_conn(self):return self.esdef set_multi_data(self, datas):'''批量插入数据'''success = bulk(self.es, datas, raise_on_error=True)return success

三、消费kafka数据

from kafka import KafkaConsumer, TopicPartition, OffsetAndMetadata
from . import Create_ESclass AppKfkConsumer(object):def __init__(self):self.server = 'localhost:9092'self.topic = KAFKA_TOPICself.consumer = Noneself.tp = Noneself.consumer_timeout_ms = 5000  # 设置消费超时时间,self.type = 'members'self.group_id = 'test1'  # 设置消费group_id,避免重复消费self.es_index = 'index'  # es的indexdef get_connect(self):self.consumer = KafkaConsumer(group_id=self.group_id,auto_offset_reset='earliest',  # 从最早的数据开始消费bootstrap_servers=self.server,enable_auto_commit=False,  # 关闭自动提交consumer_timeout_ms=self.consumer_timeout_ms)self.tp = TopicPartition(topic=self.topic, partition=0)  # 设置我们要消费的分区self.consumer.assign([self.tp])  # 由consumer对象分配分区def beginConsumer(self):now_offset = 0  # 当前偏移量es_conn = Create_ES()Actions = []while True:for message in self.consumer:now_offset = message.offset  # 获取当前偏移量data = eval(message.value.decode())  # 解析数据action = {"_index": self.es_index,"_type": self.type,"_source": data}Actions.append(action)if len(Actions) >= 50000:result = es_conn.set_multi_data(Actions)  # 批量插入数据Actions = []# 提交偏移量,now_offset+1的原因是因为我发现如果不加1,下次消费会从上次消费最后一条数据开始,重复消费self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})if len(Actions) > 0:result = es_conn.set_multi_data(Actions)Actions = []self.consumer.commit(offsets={tp:(OffsetAndMetadata(now_offset+1, None))})def delconnect(self):self.consumer.close()# 执行任务
ks = AppKfkConsumer()
ks.get_connect()
ks.beginConsumer()

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 微信小程序-使用Component方法代替Page方法构造页面
  • Profinet转EtherNet/IP协议转化网关(功能与配置)
  • DT浏览器使用教程之如何写书法笔记
  • mysql特殊字符、生僻字存储设置
  • idea-springboot后端所有@注释含义汇总-持续更新!
  • C++ 学习补充 1:短链算法
  • 高效掌握芯片设计技术的不二选择
  • 【redis】一致性hash算法和hash槽
  • 测试面试宝典(三十四)—— token是做什么用的?
  • Linux Vim教程:多文件编辑与窗口管理
  • Unity3D 转换微信小游戏指引 05 广告内购
  • 鸿蒙HarmonyOS开发:多种内置弹窗及自定义弹窗的详细使用指南
  • Vscode——如何快速搜索项目工程中的某个文件的位置
  • 关于STM32 UART4串口通信出现的N个问题的解决
  • 科技与占星的融合:AI 智能占星师
  • 《Javascript数据结构和算法》笔记-「字典和散列表」
  • Android开发 - 掌握ConstraintLayout(四)创建基本约束
  • Android组件 - 收藏集 - 掘金
  • HTML中设置input等文本框为不可操作
  • javascript 哈希表
  • Javascript设计模式学习之Observer(观察者)模式
  • JWT究竟是什么呢?
  • node-glob通配符
  • Python十分钟制作属于你自己的个性logo
  • react-core-image-upload 一款轻量级图片上传裁剪插件
  • Redis 中的布隆过滤器
  • Redux系列x:源码分析
  • Vue官网教程学习过程中值得记录的一些事情
  • 对超线程几个不同角度的解释
  • 分布式熔断降级平台aegis
  • 基于Android乐音识别(2)
  • 聚类分析——Kmeans
  • 区块链分支循环
  • 使用Swoole加速Laravel(正式环境中)
  • 数组大概知多少
  • 微服务框架lagom
  • 微信小程序填坑清单
  • 我从编程教室毕业
  • 在electron中实现跨域请求,无需更改服务器端设置
  • const的用法,特别是用在函数前面与后面的区别
  • 机器人开始自主学习,是人类福祉,还是定时炸弹? ...
  • 如何正确理解,内页权重高于首页?
  • ​configparser --- 配置文件解析器​
  • ​LeetCode解法汇总2583. 二叉树中的第 K 大层和
  • #{} 和 ${}区别
  • $nextTick的使用场景介绍
  • (Charles)如何抓取手机http的报文
  • (Matlab)基于蝙蝠算法实现电力系统经济调度
  • (八)Flask之app.route装饰器函数的参数
  • (多级缓存)缓存同步
  • (二)十分简易快速 自己训练样本 opencv级联lbp分类器 车牌识别
  • (附源码)php投票系统 毕业设计 121500
  • (附源码)计算机毕业设计ssm电影分享网站
  • (面试必看!)锁策略
  • (贪心 + 双指针) LeetCode 455. 分发饼干