当前位置: 首页 > news >正文

Kafka消息队列python开发环境搭建

目录

引言

Kafka 的核心概念和组件

Kafka 的主要特性

使用场景

申请云服务器

安装docker及docker-compose

VSCODE配置

开发环境搭建

搭建Kafka的python编程环境

Kafka的python编程示例

引言

Apache Kafka 是一个分布式流处理平台,由 LinkedIn 开发并在 2011 年贡献给 Apache 软件基金会。虽然 Kafka 常被归类为消息队列(也称为消息传递系统或消息中间件),但它实际上提供了比传统消息队列更丰富的功能,特别是在处理大规模数据流方面。Kafka 最初被设计用于处理 LinkedIn 的高吞吐量日志数据,但现在已广泛应用于各种场景,包括网站活动跟踪、日志收集、实时分析、监控数据聚合以及流处理等。

Kafka 的核心概念和组件
  1. Broker(代理):Kafka 集群中的服务器被称为 broker。每个 broker 都可以独立地处理来自生产者的数据,并响应消费者的请求。

  2. Topic(主题):Kafka 中的消息被分类存储在名为 topic 的容器中。每个 topic 可以有多个分区(partition),每个分区都有序地存储消息。

  3. Partition(分区):分区是 Kafka 中实现水平扩展和容错的关键。每个分区可以分布在不同的 broker 上,同时每个分区内的消息都是有序的。

  4. Producer(生产者):生产者负责向 Kafka 集群发送消息到指定的 topic。生产者可以指定消息的键(key),Kafka 使用这个键来决定消息被发送到哪个分区。

  5. Consumer(消费者):消费者从 Kafka 集群订阅 topic 并消费数据。Kafka 支持多个消费者群组(consumer group)同时消费同一个 topic,每个消费者群组内的消费者可以共同分担处理数据的任务。

  6. Consumer Group(消费者群组):同一个消费者群组内的消费者可以并行地消费同一个 topic 的不同分区,但每个分区只能被一个消费者群组内的一个消费者消费,以确保消息的有序性。

  7. Offset(偏移量):Kafka 中的每条消息都有一个唯一的偏移量,用于标识消息在分区中的位置。消费者通过记录自己消费到的偏移量来跟踪消息的读取进度。

Kafka 的主要特性
  • 高吞吐量:Kafka 被设计用来处理高吞吐量的数据,可以轻松处理成千上万条消息/秒。
  • 可扩展性:Kafka 的集群可以通过增加更多的 broker 来水平扩展,以处理更大的数据量和更高的吞吐量。
  • 持久性:Kafka 通过将消息存储在磁盘上来保证消息的持久性,即使在服务器故障的情况下也不会丢失数据。
  • 容错性:Kafka 提供了强大的容错机制,包括自动的副本复制和数据冗余,以确保数据的可靠性和可用性。
  • 实时性:Kafka 支持实时数据处理,使得它可以用于构建实时流处理应用程序。
使用场景
  • 消息传递:作为传统的消息队列使用,支持解耦的生产者和消费者。
  • 网站活动跟踪:收集和分析用户的点击流、搜索查询等网站活动数据。
  • 日志收集:从分布式系统中收集日志数据,用于监控和分析。
  • 实时分析:对实时数据流进行实时处理和分析,以支持实时决策。
  • 事件流处理:处理实时事件流,如传感器数据、金融交易数据等。

申请云服务器

(以京东云为例,阿里云、腾讯云、华为云、天翼云类似)

注意在选择操作系统的时候选择ubuntu22.04或ubuntu20.04

管理员账户root

管理员密码:在安装的时候设置,记住密码

下载安装mobaXterm

https://mobaxterm.mobatek.net/download-home-edition.html

安装docker及docker-compose

#以下只安装一次即可!
sudo apt update
sudo apt install -y docker.io # intel x86_64
sudo curl -SL https://github.com/docker/compose/releases/download/v2.21.0/docker-compose-linux-x86_64 \-o /usr/local/bin/docker-compose
# 如果github不能访问,可用hub.njuu.cf或521github.com/镜像站替换github.com重试
sudo chmod +x /usr/local/bin/docker-compose #如报错,去掉sudo重试
sudo ln -s /usr/local/bin/docker-compose /usr/bin/docker-compose #如报错,去掉sudo重试

查看docker和dockers是否安装好?

docker version
docker-compose version

VSCODE配置

开发环境搭建

  • 将如下文件保存为docker-compose.yml,并上传至服务器,例如/home/ubuntu/iiot/kafka

  • 将下面代码中的localhost替换为云服务公网IP

version: '3'
services:zookeeper:image: confluentinc/cp-zookeeper:latestenvironment:ZOOKEEPER_CLIENT_PORT: 2181ports:- "2181:2181"
dkafka:image: confluentinc/cp-kafka:latestcontainer_name: kafka-devdepends_on:- zookeeperenvironment:KAFKA_BROKER_ID: 1KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1ports:- "9092:9092"
  • 上传metaldigi.zip

解压缩文件

sudo apt updatesudo apt install zipunzip metaladigi.zip

cd metaldigi/kafka
  • 启动kafka消息服务器

    • 在命令行执行消息生产者

docker-compose up -d

搭建Kafka的python编程环境

  • 进入metaldigi文件夹

  • 执行 docker ps

cd metaldigi
docker-compose up -d

Kafka的python编程示例

  • 进入metaldigi文件夹

  • 执行 docker ps

 docker exec -it metal-digi-backend bash

消息生产者

from kafka import KafkaProducer# 创建一个 Kafka 生产者实例# 这里指定了 Kafka 服务器的地址和端口
producer = KafkaProducer(bootstrap_servers='150.158.11.142:9092')# 循环发送 10 条消息到 'demo-topic' 主题
for_in range(10):
# 将要发送的消息转换为字节格式message =f'message{_}'.encode('utf-8')# 发送消息到 Kafka 的 'demo-topic' 主题producer.send('demo-topic', value=message)# 打印已发送的消息print(f'Sent message: message{_}')# 关闭生产者实例
producer.close()这段代码创建一个 Kafka 生产者,用于向 Kafka 集群发送消息。它循环发送10条消息到名为 'demo-topic' 的主题。每条消息都是一个简单的文本字符串,转换为字节格式后发送。

消息消费者

from kafka import KafkaConsumer# 创建一个 Kafka 消费者实例
# 指定 Kafka 服务器地址、端口以及其他一些配置
consumer = KafkaConsumer('demo-topic',  # 指定要消费的主题bootstrap_servers='150.158.11.142:9092',  # Kafka 服务器地址和端口auto_offset_reset='earliest',  # 从最早的消息开始消费enable_auto_commit=True,  # 自动提交偏移量group_id='demo-group'  # 消费者组标识
)# 循环消费并打印收到的消息
for message in consumer:# 解码并打印消息内容print(f"Received message: {message.value.decode()}")

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 微软的vscode和vs2022快捷键官网链接
  • 【漏洞复现】Rejetto HTTP文件服务器——远程命令执行(CVE-2024-23692)
  • 微信小程序加载动画文件
  • html(抽奖设计)
  • Qt 多语言
  • 解决spring boot中使用拦截器导致swagger文档无法访问
  • 数据库内核研发学习之路(三)创建postgres内置函数
  • Linux 安装多个jdk,切换使用
  • OPC通信从入门到精通_2_OPC通信详解和C#客户端编程(OPC基础概念;OPC通信仿真(KepServer作为OPC服务器;使用Modbus Slave和另外软件仿真2个PLC设备);C#程序)
  • Android使用AndServer在安卓设备上搭建服务端(Java)(Kotlin)两种写法
  • 大语言模型LLM
  • 详解python基本语法
  • 每日一练——第四题
  • vue自制表格
  • 什么是TCP
  • @angular/forms 源码解析之双向绑定
  • DataBase in Android
  • extjs4学习之配置
  • iOS编译提示和导航提示
  • JavaScript创建对象的四种方式
  • Java编程基础24——递归练习
  • Linux CTF 逆向入门
  • python大佬养成计划----difflib模块
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • tweak 支持第三方库
  • UEditor初始化失败(实例已存在,但视图未渲染出来,单页化)
  • 翻译--Thinking in React
  • 可能是历史上最全的CC0版权可以免费商用的图片网站
  • 聊一聊前端的监控
  • 模仿 Go Sort 排序接口实现的自定义排序
  • 学习JavaScript数据结构与算法 — 树
  • 云栖大讲堂Java基础入门(三)- 阿里巴巴Java开发手册介绍
  • Oracle Portal 11g Diagnostics using Remote Diagnostic Agent (RDA) [ID 1059805.
  • UI设计初学者应该如何入门?
  • #include到底该写在哪
  • #考研#计算机文化知识1(局域网及网络互联)
  • #我与Java虚拟机的故事#连载19:等我技术变强了,我会去看你的 ​
  • (2024,Flag-DiT,文本引导的多模态生成,SR,统一的标记化,RoPE、RMSNorm 和流匹配)Lumina-T2X
  • (C#)Windows Shell 外壳编程系列9 - QueryInfo 扩展提示
  • (MonoGame从入门到放弃-1) MonoGame环境搭建
  • (力扣记录)235. 二叉搜索树的最近公共祖先
  • (一)、python程序--模拟电脑鼠走迷宫
  • .bat批处理(九):替换带有等号=的字符串的子串
  • .bat批处理(六):替换字符串中匹配的子串
  • .NET CLR基本术语
  • .net 前台table如何加一列下拉框_如何用Word编辑参考文献
  • .NET/C# 中设置当发生某个特定异常时进入断点(不借助 Visual Studio 的纯代码实现)
  • .net6 当连接用户的shell断掉后,dotnet会自动关闭,达不到长期运行的效果。.NET 进程守护
  • .net6使用Sejil可视化日志
  • .sys文件乱码_python vscode输出乱码
  • @Builder用法
  • [ 环境搭建篇 ] 安装 java 环境并配置环境变量(附 JDK1.8 安装包)
  • [20150707]外部表与rowid.txt
  • [Android] 240204批量生成联系人,短信,通话记录的APK
  • [Android] Amazon 的 android 音视频开发文档