当前位置: 首页 > news >正文

C#实现数据采集系统-数据反写(1)MQTT订阅接收消息

C#实现数据采集系统-数据反写

实现步骤

  1. MQTT订阅,接收消息
  2. 反写内容写入通信类,添加到写入队列中 链接-消息内容处理和写入通信类队列
  3. 实现Modbustcp通信写入

具体实现

1.MQTT订阅,接收消息

Mqtt实现采集数据转发

Mqtt控制类增加订阅方法

  1. 增加一个通用的订阅方法,需要的参数是一个主题和一个委托,将主题跟对应的委托方法对应存储,然后再mqtt中订阅,收到对应的主题消息,然后执行对应的方法。
 public void SubscribeTopic(string topic, Action<string, string> topicAction){//订阅}

然后需要一个键值对用于存储这个关系

 private Dictionary<string, Action<string, string>> _topicActions;

订阅方法实现:订阅主题,添加到_topicActions,如果已经连接,则直接订阅,没有连接,则等待连上的时候自动订阅,增加锁来确保订阅成功

/// <summary>
/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅
/// </summary>
/// <param name="topic"></param>
/// <param name="topicAction"></param>
public void SubscribeTopic(string topic, Action<string, string> topicAction)
{lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}

在连接方法中,添加订阅

在这里插入图片描述

public void MqttConnect()
{while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}
  1. 添加接收消息事件
 //客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}

完整Mqtt代码

 public class MqttControllor{private MqttConfig _config;private string _clientId;MqttClientOptions _clientOptions;private IMqttClient _mqttClient;private readonly object _topicActionsLock = new object();private Dictionary<string, Action<string, string>> _topicActions;public MqttControllor(MqttConfig config, bool isAutoConnect = true){_topicActions = new Dictionary<string, Action<string, string>>();_config = config;_clientId = config.ClientId == "" ? Guid.NewGuid().ToString() : config.ClientId;MqttClientOptionsBuilder optionsBuilder = new MqttClientOptionsBuilder().WithTcpServer(_config.Ip, _config.Port).WithCredentials(_config.Username, _config.Password).WithClientId(_clientId);_clientOptions = optionsBuilder.Build();_mqttClient = new MqttFactory().CreateMqttClient();// 客户端连接关闭事件_mqttClient.DisconnectedAsync += MqttClient_DisconnectedAsync;//客户端接收消息事件_mqttClient.ApplicationMessageReceivedAsync +=MqttClient_ApplicationMessageReceivedAsync;if (isAutoConnect){Task.Run(() =>{MqttConnect();});}}/// <summary>/// 接收消息/// </summary>/// <param name="args"></param>/// <returns></returns>private async Task MqttClient_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs args){try{Console.WriteLine($"收到消息:{args.ApplicationMessage.Topic}");if (_topicActions.ContainsKey(args.ApplicationMessage.Topic)){_topicActions[args.ApplicationMessage.Topic].Invoke(args.ApplicationMessage.Topic,Encoding.UTF8.GetString(args.ApplicationMessage.Payload));}}catch (Exception ex){Console.WriteLine(ex.Message);}}private Task MqttClient_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){Console.WriteLine($"客户端已断开与服务端的连接……");//断开重连_mqttClient = new MqttFactory().CreateMqttClient();MqttConnect();return Task.CompletedTask;}public void MqttConnect(){while (!_mqttClient.IsConnected){try{Console.WriteLine($"正在连接……");_mqttClient.ConnectAsync(_clientOptions).GetAwaiter().GetResult();}catch (Exception ex){Task.Delay(1000).Wait();Console.WriteLine("连接mqtt服务器失败");}}Console.WriteLine($"客户端已连接到服务端……");//连接成功,订阅主题lock (_topicActionsLock){foreach (var item in _topicActions){_mqttClient.SubscribeAsync(item.Key);}}}/// <summary>/// 订阅主题,添加到_topicActions,如果已经连接,则直接订阅/// </summary>/// <param name="topic"></param>/// <param name="topicAction"></param>public void SubscribeTopic(string topic, Action<string, string> topicAction){lock (_topicActionsLock){if (!_topicActions.ContainsKey(topic)){_topicActions.Add(topic, topicAction);if (_mqttClient.IsConnected){_mqttClient.SubscribeAsync(topic);}}}}/// <summary>/// 推送消息/// </summary>/// <param name="topic">主题</param>/// <param name="data">消息内容</param>/// <param name="qsLevel"></param>/// <param name="retain"></param>public void Publish(string topic, string data, int qsLevel = 0, bool retain = false){qsLevel = Math.Clamp(qsLevel, 0, 2);if (!_mqttClient.IsConnected){throw new Exception("mqtt未连接");}var message = new MqttApplicationMessage{Topic = topic,PayloadSegment = Encoding.UTF8.GetBytes(data),QualityOfServiceLevel = (MqttQualityOfServiceLevel)qsLevel,Retain = retain // 服务端是否保留消息。true为保留,如果有新的订阅者连接,就会立马收到该消息。};_mqttClient.PublishAsync(message);}}

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 【回文数判断】输入一个5位数,判断它是不是回文数
  • springboot后端开发-自定义参数校验器
  • Win 10高效录屏指南:四大神器助力屏幕录制!
  • python学习之redis
  • openpyxl库详细介绍十分全面
  • QT-五子棋游戏
  • JavaScript 中的深拷贝新宠:structuredClone() 函数详解
  • 网络安全之xss靶场练习
  • 用py获取显卡的占用率
  • 【解压即玩】PC电脑版【漫威蜘蛛侠2】豪华中文版+通关存档+画质超棒,神作
  • MySQL——单表查询(二)按条件查询(11)OR 和 AND 关键字一起使用的情况
  • Springcloud从零开始--Eureka(一)
  • 九、 系统安全(考点篇)试题
  • 无人机飞手培训:考证、组装、维修技术详解
  • 深信服技术服务工程师面试全过程分享
  • 【翻译】Mashape是如何管理15000个API和微服务的(三)
  • Git 使用集
  • Git学习与使用心得(1)—— 初始化
  • JavaScript设计模式与开发实践系列之策略模式
  • JS专题之继承
  • Mysql5.6主从复制
  • React-redux的原理以及使用
  • React组件设计模式(一)
  • Spring Cloud Feign的两种使用姿势
  • TypeScript实现数据结构(一)栈,队列,链表
  • 笨办法学C 练习34:动态数组
  • 从零开始的无人驾驶 1
  • 翻译 | 老司机带你秒懂内存管理 - 第一部(共三部)
  • 技术攻略】php设计模式(一):简介及创建型模式
  • 将 Measurements 和 Units 应用到物理学
  • 开发了一款写作软件(OSX,Windows),附带Electron开发指南
  • 前端性能优化——回流与重绘
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 如何用vue打造一个移动端音乐播放器
  • 深度学习入门:10门免费线上课程推荐
  • 腾讯大梁:DevOps最后一棒,有效构建海量运营的持续反馈能力
  • 我看到的前端
  • gunicorn工作原理
  • python最赚钱的4个方向,你最心动的是哪个?
  • ​3ds Max插件CG MAGIC图形板块为您提升线条效率!
  • ​zookeeper集群配置与启动
  • ​补​充​经​纬​恒​润​一​面​
  • ​用户画像从0到100的构建思路
  • $.ajax()方法详解
  • (c语言)strcpy函数用法
  • (webRTC、RecordRTC):navigator.mediaDevices undefined
  • (ZT)出版业改革:该死的死,该生的生
  • (安卓)跳转应用市场APP详情页的方式
  • (二) 初入MySQL 【数据库管理】
  • (附源码)springboot“微印象”在线打印预约系统 毕业设计 061642
  • (全部习题答案)研究生英语读写教程基础级教师用书PDF|| 研究生英语读写教程提高级教师用书PDF
  • (收藏)Git和Repo扫盲——如何取得Android源代码
  • (顺序)容器的好伴侣 --- 容器适配器
  • (算法)大数的进制转换
  • (转)关于多人操作数据的处理策略