当前位置: 首页 > news >正文

Apache Nifi挂接MQTT与Kafka实践

目录

1.  说明:

2. 方案设计:

2.1 资源配置:

2.2 交互Topics:  

3. 实现步骤 

3.1 Nifi 桌面

3.2 MqttToKafka

3.2.1 配置

3.2.2 测试

3.2.3 结果

3.3 KafkaToMqtt

3.3.1 配置 

3.3.1 测试

3.3.1 结果 ​编辑

4. 总结:

4.1 知识点

Nifi Kafka Processor 配置字典:

Topic通配符:

5. 参考:


1.  说明:

      在一些方案实现过程中,感觉需要一种接驳器来连接不同的数据源并汇流到一处进行统一处理,于是寻到NIFI(官网)这个工具,它相当于“数据水管+接驳器工具箱”,能丝滑联结不同的数据源,总体思路是把各类数据源汇流到Kafka集中处理,比如日志文件,消息传递,数据库操作等。初步使用感觉很不错,分享之。

2. 方案设计:

- 连接Emqx集群(mqtt服务)与Kafka集群,实现数据流动的双工运作 - 客户端(连mqtt) <=> 应用服务(连kafka)

2.1 资源配置:

简单起见,在docker环境中实施,后续迁移到K8s

服务集群服务入口备注
MQTT (tcp|mqtt)://host001.dev.ia:1883

client id:

nifi-xio1-sub1 订阅者

nifi-xio1-pub1 发布者

Kafkahost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Apache Nifihttp://host001.dev.ia:9080/nifi/

Nifi的docker配置

# 建个卷,持久化数据
docker volume create nifi_data

docker-compose.yml

version: "3.7"
services:nifi:image: apache/nifi:1.9.2container_name: nifirestart: alwaysports:- "9080:8080"environment:- NIFI_WEB_HTTP_HOST=0.0.0.0#- NIFI_HOME=/home/nifi#- NIFI_LOG_DIR=/home/nifi/logsvolumes:- nifi_data:/home/nifivolumes:nifi_data:external: true
2.2 交互Topics:  
Topic备注
test.topic.nifi1测试接收
test.topic.bus总线

3. 实现步骤 

3.1 Nifi 桌面

配好后,访问​http://host001.dev.ia:9080/nifi/​, 中间是配好的两个Processor Group,分别是MqttToKafka与KafkaToMqtt,代表双向流动配置。

3.2 MqttToKafka
3.2.1 配置

 加ConsumeMQTT Processor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
NameConsumeMQTT
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-sub1
Username/Password--
Topic Filtertest.topic.nifi1
Max Queue Size1000

加PublishKafka_2_0 Processor:拉Processor组件下去,点开选PublishKafka_2_0

Properties备注
NamePublishKafka_2_0
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Security ProtocolPLAINTEXT
Topic Nametest.topic.nifi1
Delivery Guarantee

Guarantee Replicated Delivery

Use Transactionstrue

拖动ConsumeMQTT连接PublishKafka, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.2.2 测试

说明:

  1. 用mqtt客户端工具MqttX向topic=tset.topic.nifi1发送json数据包
  2. 用python脚本作为消费者客户端连接kafka,订阅topic=tset.topic.nifi1,获取该数据包

python脚本:

from confluent_kafka import Consumer, KafkaError, KafkaException
import asyncio
import jsonasync def consume_loop(consumer, topics):try:# 订阅主题consumer.subscribe(topics)while True:# 轮询消息msg = consumer.poll(timeout=1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:# End of partition eventprint("%% %s [%d] reached end at offset %d\n"% (msg.topic(), msg.partition(), msg.offset()))elif msg.error():raise KafkaException(msg.error())else:# 正常消息raw_message = msg.value()print(f"Raw message: {raw_message}")parsed_message = json.loads(raw_message.decode("utf-8"))print(f"Received message: {type(parsed_message)} : {parsed_message}")await asyncio.sleep(0.01)  # 小睡片刻,让出控制权finally:# 关闭消费者consumer.close()async def consume():# 消费者配置conf = {"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092","group.id": "mygroup1","auto.offset.reset": "earliest",}# 创建消费者consumer = Consumer(conf)await consume_loop(consumer, ["tset.topic.nifi1"])if __name__ == "__main__":asyncio.run(consume())
3.2.3 结果

脚本 Nifi 

3.3 KafkaToMqtt
3.3.1 配置 

加ConsumeKafkaProcessor:拉Processor组件下去,点开选ConsumeMQTT

Settings备注
NameConsumeKafka_2_0
Properties备注
Kafka Brokershost001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092
Topic Name(s)test.topic.bus
Group IDtest

加PublishMQTT Processor:拉Processor组件下去,点开选PublishMQTT

Settings备注
NamePublishMQTT
Automatically terminate relationships

failure / success 勾选

Properties备注
Broker URItcp://host001.dev.ia:1883
Client IDnifi-xio1-pub1
Username/Password--
Topic Filtertest.topic.bus
QoS0

 拖动ConsumeKafka_2_0连接PublishMQTT, 会添加一个队列连接组件命名为 Message

正确运行如图:

3.3.1 测试

说明:

  1. python脚本向Kafka发布消息到 topic = test.topic.bus
  2. MqttX客户端订阅接收

脚本

from confluent_kafka import Producer
import jsondef delivery_report(err, msg):"""Called once for each message produced to indicate delivery result.Triggered by poll() or flush()."""if err is not None:print(f"Message delivery failed: {err}")else:print(f"Message delivered to {msg.topic()} [{msg.partition()}]")def create_async_producer(config):"""Creates an instance of an asynchronous Kafka producer."""return Producer(config)def produce_messages(producer, topic, messages):"""Asynchronously produces messages to a Kafka topic."""for message in messages:# Trigger any available delivery report callbacks from previous produce() callsproducer.poll(0)# Asynchronously produce a message, the delivery report callback# will be triggered from poll() above, or flush() below, when the message has# been successfully delivered or failed permanently.producer.produce(topic, json.dumps(message).encode("utf-8"), callback=delivery_report)# Wait for any outstanding messages to be delivered and delivery report# callbacks to be triggered.producer.flush()if __name__ == "__main__":# Kafka configuration# Replace these with your server's configurationconf = {"bootstrap.servers": "host001.dev.ia:19092,host001.dev.ia:29092,host001.dev.ia:39092",  # Replace with your Kafka server addresses# "client.id": "python-producer",}# Create an asynchronous Kafka producerasync_producer = create_async_producer(conf)# Messages to send to Kafkamessages_to_send = [{"key": "value1"}, {"key": "value2"}, {"key": "value3"}]# Produce messagesproduce_messages(async_producer, "test.topic.bus", messages_to_send)
3.3.1 结果 

MqttX 

Nifi 

4. 总结:

       Nifi支持集群化部署,如此从数据采集,数据流动到数据存储都实现了分布式,而且有可视化的界面可方便地进行数据节点的集聚与增减配置,目前只是浅尝即止,更深入的研究待后续不断补充优化。

4.1 知识点
Nifi Kafka Processor 配置字典:
Delivery Guarantee

数据传递保证

  1. Best Effort (尽力交付,相当于ack=0)
  2. Guarantee Single Node Delivery(保证单节点交付,相当于ack=1,Kafka中的默认配置):
  3. Guarantee Replicated Delivery(保证复制交付,相当于ack=-1)
Use Transactions

使用事务 

true / false 

Topic通配符:
“/”

主题层级分隔符

如果存在分隔符,它将主题名分割为多个主题层级。

如:room401/tv/contrl/sensor

“#”多层通配符

匹配主题中任意层级的通配符

如果客户端订阅主题 “china/guangzhou/#”, 它会收到使用下列主题名发布的消息

china/guangzhou china/guangzhou/huangpu china/guangzhou/tianhe/zhongshanlu china/guangzhou/tianhe/zhongshanlu/num123

school/#                //也匹配单独的 “school” ,因为 # 包括它的父级。
#                       //是有效的,会收到所有的应用消息。
school/teacher/#        //有效的。
school/teacher#         //无效的。
school/teacher/#/lever  //无效的,必须是主题过滤器的最后一个字符
https://blog.51cto.com/u_16099203/10959511

“+”单层通配符

单个主题层级匹配的通配符。在主题过滤器的任意层级都可以使用单层通配符,包括第一个和最后一个层级。

china/+ 只能匹配 china/guangzhou

china/+/+/zhongshanlu 能匹配china/guangzhou/tianhe/zhongshanlu和china/shenzhen/nanshan/zhongshanlu

“$”匹配一个字符$xx
/$xx
/xx$

5. 参考:

- https://zhuanlan.zhihu.com/p/697301397

- https://blog.51cto.com/u_16213319/7344183

- Apache NiFi Docker Compose | All About

- https://blog.51cto.com/u_16099203/10959511

- 大数据NiFi(二十一):监控日志文件生产到Kafka-腾讯云开发者社区-腾讯云

- PublishMQTT

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 认知觉醒:铸就非凡人生的进阶之路
  • 单例模式懒汉模式和饿汉模式
  • 2024年【甘肃省安全员B证】考试资料及甘肃省安全员B证模拟试题
  • 探索 SPL-404 协议标准:NFT 与 DeFi 的融合
  • spring框架实现滑动验证码功能
  • Java修炼 Java SE 面试题目 (简答) 2024.7.26 22:16
  • .net core docker部署教程和细节问题
  • 安防视频监控EasyCVR视频汇聚平台修改配置后无法启动的原因排查与解决
  • Java 扫雷游戏
  • Java:swagger/knife4j接口返回的json数据中文显示乱码问号???
  • OpenAI发布GPT-4 Mini的深度分析及中国大模型的弯道超车机会
  • 【Golang 面试基础题】每日 5 题(十)
  • 基于上云api前端开发经验教训(loading...)
  • 基于python的BP神经网络回归模型
  • RT-Thread Studio搭建 Renesa Version Board开发环境
  • 自己简单写的 事件订阅机制
  • [case10]使用RSQL实现端到端的动态查询
  • CSS 专业技巧
  • es6(二):字符串的扩展
  • ES6之路之模块详解
  • express.js的介绍及使用
  • Git同步原始仓库到Fork仓库中
  • Java,console输出实时的转向GUI textbox
  • Python学习之路13-记分
  • SAP云平台运行环境Cloud Foundry和Neo的区别
  • Sequelize 中文文档 v4 - Getting started - 入门
  • Spring Cloud Feign的两种使用姿势
  • 测试如何在敏捷团队中工作?
  • 每天一个设计模式之命令模式
  • 如何用Ubuntu和Xen来设置Kubernetes?
  • 腾讯视频格式如何转换成mp4 将下载的qlv文件转换成mp4的方法
  • 网络应用优化——时延与带宽
  • 一个SAP顾问在美国的这些年
  • 用Canvas画一棵二叉树
  • 原生 js 实现移动端 Touch 滑动反弹
  • 走向全栈之MongoDB的使用
  • 2017年360最后一道编程题
  • ​iOS安全加固方法及实现
  • #70结构体案例1(导师,学生,成绩)
  • #Linux杂记--将Python3的源码编译为.so文件方法与Linux环境下的交叉编译方法
  • $NOIp2018$劝退记
  • (4)(4.6) Triducer
  • (4)Elastix图像配准:3D图像
  • (二十九)STL map容器(映射)与STL pair容器(值对)
  • (含react-draggable库以及相关BUG如何解决)固定在左上方某盒子内(如按钮)添加可拖动功能,使用react hook语法实现
  • (每日持续更新)jdk api之FileFilter基础、应用、实战
  • (免费领源码)Python#MySQL图书馆管理系统071718-计算机毕业设计项目选题推荐
  • (七)Activiti-modeler中文支持
  • (七)glDrawArry绘制
  • (三)mysql_MYSQL(三)
  • (微服务实战)预付卡平台支付交易系统卡充值业务流程设计
  • (转)memcache、redis缓存
  • (转)原始图像数据和PDF中的图像数据
  • .bat批处理(二):%0 %1——给批处理脚本传递参数
  • .NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法