当前位置: 首页 > news >正文

C#MQTT协议应用

1 ,MQTT介绍:
MQTT详解以及实际操作_mqtt使用-CSDN博客

2,MQTT应用:

C#MQTT编程06--MQTT服务器和客户端(winform版)_c# mqtt服务器-CSDN博客

3,MQTT实例:

 

效果 

代码:

服务端

  public partial class Form1 : Form{Timer timer = new Timer();List<ClientItem> clients = new List<ClientItem>();public Form1(){InitializeComponent();timer.Interval = 500;timer.Tick += Timer_Tick;timer.Start();InitSetting();}private void Timer_Tick(object sender, EventArgs e){toolStripStatusDateTime.Text = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss");}void InitSetting(){//获取本机所有的IP4地址string[] ips = Dns.GetHostAddresses(Environment.MachineName).Where(item => !item.IsIPv6LinkLocal).Select(item => item.ToString()).ToArray();comboIp_List.DataSource = ips;txtPort.Text = "8000";txtUserName.Text = "sa";txtPwd.Text = "1";}IMqttServer mqttServer;private void btnStart_Click(object sender, EventArgs e){if (btnStart.Text.Equals("开始")){IMqttServerOptions options = new MqttServerOptionsBuilder().WithConnectionBacklog(10).WithDefaultEndpointBoundIPAddress(IPAddress.Parse(comboIp_List.Text.Trim())).WithDefaultEndpointPort(Convert.ToInt32(txtPort.Text.Trim())).WithDefaultCommunicationTimeout(TimeSpan.FromMilliseconds(5000)).WithConnectionValidator(context =>{//验证if (ck_UserPwd.Checked){if (!context.Username.Equals(txtUserName.Text)){context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;}if (!context.Password.Equals(txtPwd.Text)){context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;}}else{context.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.Success;}}).Build();mqttServer = new MqttFactory().CreateMqttServer();mqttServer.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(ApplicationMessageReceived);//服务开始事件mqttServer.StartedHandler = new MqttServerStartedHandlerDelegate(MqttServer_StartedAsync);//服务停止事件mqttServer.StoppedHandler = new MqttServerStoppedHandlerDelegate(MqttServer_StoppedAsync);//客户端连接事件mqttServer.ClientConnectedHandler = new MqttServerClientConnectedHandlerDelegate(MqttServer_ClientConnectedAsync);//客户端断开连接事件mqttServer.ClientDisconnectedHandler = new MqttServerClientDisconnectedHandlerDelegate(MqttServer_ClientDisconnectedAsync);//客户端订阅事件mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedTopicHandlerDelegate(MqttServer_ClientSubscribedTopicAsync);//客户端取消订阅事件mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttServer_ClientUnsubscribedTopicAsync);mqttServer.StartAsync(options);if (mqttServer.IsStarted){btnStart.Text = "断开";btnStart.BackColor = Color.Green;comboIp_List.Enabled = txtPort.Enabled = false;}}else{mqttServer.StopAsync().Wait();if (!mqttServer.IsStarted){btnStart.Text = "开始";btnStart.BackColor = Color.Red;comboIp_List.Enabled = txtPort.Enabled = true;}mqttServer.Dispose();}}private Task MqttServer_ClientUnsubscribedTopicAsync(MqttServerClientUnsubscribedTopicEventArgs arg){//客户端取消订阅事件string msg = $"客户端ID:[{arg.ClientId}] 取消主题:[{arg.TopicFilter}] 订阅";AddLog(1, msg);//取消订阅ClientItem client = clients.FirstOrDefault(item => item.ClientID.Equals(arg.ClientId));client?.Topics.RemoveAll(item => item.Topic.Equals(arg.TopicFilter));return Task.FromResult(arg);}private Task MqttServer_ClientSubscribedTopicAsync(MqttServerClientSubscribedTopicEventArgs arg){//客户端订阅事件string msg = $"客户端ID:[{arg.ClientId}]订阅主题:[{arg.TopicFilter.Topic}],Qos:[{arg.TopicFilter.QualityOfServiceLevel}]";AddLog(0, msg);TopicItem topic = new TopicItem { QualityOfServiceLevel = arg.TopicFilter.QualityOfServiceLevel, Topic = arg.TopicFilter.Topic };ClientItem client = clients.FirstOrDefault(item => item.ClientID.Equals(arg.ClientId));client?.Topics.Add(topic);return Task.FromResult(arg);}private Task MqttServer_ClientDisconnectedAsync(MqttServerClientDisconnectedEventArgs arg){//客户端断开事件string msg = $"客户端ID:[{arg.ClientId}] 断开连接,EndPoint:{arg.Endpoint},断开连接类型:{arg.DisconnectType}";AddLog(1, msg);//移除该Clientclients.RemoveAll(item => item.ClientID.Equals(arg.ClientId));//更新ListBoxUpdateClientList();return Task.FromResult(arg);}private Task MqttServer_ClientConnectedAsync(MqttServerClientConnectedEventArgs arg){//客户端连接事件string msg = $"客户端ID:[{arg.ClientId}]连接 ,EndPoint:{arg.Endpoint},UserName: {arg.UserName}";AddLog(0, msg);ClientItem client = new ClientItem{ClientID = arg.ClientId,EndPoint = arg.Endpoint};clients.Add(client);//更新ListBoxUpdateClientList();return Task.FromResult(arg);}private Task ApplicationMessageReceived(MqttApplicationMessageReceivedEventArgs arg){//客户端MessageReceived事件string msg = $"收到消息:发送者ID:[{arg.ClientId}] QoS:{arg.ApplicationMessage.QualityOfServiceLevel} topic:{arg.ApplicationMessage.Topic}, 内容:{arg.ApplicationMessage.ConvertPayloadToString()}";AddLog(1, msg);return Task.FromResult(arg);}private Task MqttServer_StoppedAsync(EventArgs arg){//服务停止事件string msg = $"服务停止";AddLog(1, msg);clients.Clear();return Task.FromResult(arg);}private Task MqttServer_StartedAsync(EventArgs arg){//服务开启事件string msg = $"服务开启";AddLog(0, msg);return Task.FromResult(arg);}private void ck_UserPwd_CheckedChanged(object sender, EventArgs e){txtUserName.Enabled = txtPwd.Enabled = ck_UserPwd.Checked;}void AddLog(int cate, string msg){if (lstview_Note.InvokeRequired){this.Invoke(new Action(() =>{ListViewItem item = new ListViewItem();item.ImageIndex = cate;item.SubItems.Add(DateTime.Now.ToString("HH:mm:ss"));item.SubItems.Add(msg);lstview_Note.Items.Add(item);lstview_Note.EnsureVisible(lstview_Note.Items.Count - 1);toolStripStatusConnectCount.Text = clients.Count.ToString();}));}else{ListViewItem item = new ListViewItem();item.ImageIndex = cate;item.SubItems.Add(DateTime.Now.ToString("HH:mm:ss"));item.SubItems.Add(msg);lstview_Note.Items.Add(item);lstview_Note.EnsureVisible(lstview_Note.Items.Count - 1);toolStripStatusConnectCount.Text = clients.Count.ToString();}}private void btnCreateClient_Click(object sender, EventArgs e){ClientFrm frm = new ClientFrm();frm.Show();}private void lst_Client_SelectedIndexChanged(object sender, EventArgs e){if (lst_Client.SelectedItem != null){ClientItem item = lst_Client.SelectedItem as ClientItem;lst_Topic.DataSource = item.Topics.Select(a => $"Topic:{a.Topic},QoS:{a.QualityOfServiceLevel}").ToArray();}}/// <summary>/// 更新clientList 列表/// </summary>void UpdateClientList(){this.Invoke(new Action(() => {lst_Client.Items.Clear();foreach (var item in clients){lst_Client.Items.Add(item);}toolStripStatusConnectCount.Text = clients.Count.ToString();}));}}
 class ClientItem{public List<TopicItem> Topics { get; set; } = new List<TopicItem>();public string ClientID { get; set; }public string EndPoint { get; set; }public override string ToString(){return $"{ClientID};{EndPoint}";}}class TopicItem{public string Topic { get; set; }/// <summary>/// 服务质量级别,QoS0 (Almost One):至多一次,只发送一次,会发生消息丢失或重复。QoS1(Atleast Once): 至少一次,确保消息到达,但消息可能重复发送。QoS2(Exactly Once):只有一次,确保消息只到达一次。/// </summary>public MqttQualityOfServiceLevel QualityOfServiceLevel { get; set; }}

客户端

 public partial class ClientFrm : Form{Timer timer = new Timer();public ClientFrm(){InitializeComponent();timer.Interval = 500;timer.Tick += Timer_Tick;timer.Start();InitSetting();}private void Timer_Tick(object sender, EventArgs e){toolStripStatusDateTime.Text = DateTime.Now.ToString("yyyy/MM/dd HH:mm:ss");}void InitSetting(){//获取本机所有的IP4地址string[] ips = Dns.GetHostAddresses(Environment.MachineName).Where(item => !item.IsIPv6LinkLocal).Select(item => item.ToString()).ToArray();comboIp_List.DataSource = ips;txtPort.Text = "8000";txtUserName.Text = "sa";txtPwd.Text = "1";comboQoS.DataSource = Enum.GetNames(typeof(MQTTnet.Protocol.MqttQualityOfServiceLevel));}IMqttClient client;private void btnConnect_Click(object sender, EventArgs e){//  txtClientId.Text = Guid.NewGuid().ToString();if (string.IsNullOrEmpty(txtClientId.Text.Trim())){MessageBox.Show("ClientId不能为空!");return;}if (btnConnect.Text.Equals("连接")){client = new MqttFactory().CreateMqttClient();MqttClientOptionsBuilder builder = new MqttClientOptionsBuilder().WithClientId(txtClientId.Text.Trim()).WithCommunicationTimeout(TimeSpan.FromMilliseconds(50000)).WithTcpServer(comboIp_List.Text.Trim(), Convert.ToInt32(txtPort.Text.Trim()));if (ck_UserPwd.Checked){builder.WithCredentials(txtUserName.Text.Trim(), txtPwd.Text.Trim());}IMqttClientOptions option = builder.Build();//消息接收事件client.ApplicationMessageReceivedHandler = new MqttApplicationMessageReceivedHandlerDelegate(Client_ApplicationMessageReceivedAsync);//客户端连接事件client.ConnectedHandler = new MqttClientConnectedHandlerDelegate(Client_ConnectedAsync);// client.UseConnectedHandler//客户端连接断开事件client.DisconnectedHandler = new MqttClientDisconnectedHandlerDelegate(Client_DisconnectedAsync);client.ConnectAsync(option);}else{client.DisconnectAsync().Wait();UpdateStatus(client.IsConnected);client.Dispose();}}private Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg){//客户端断开事件string msg = $" 断开连接, 返回结果:{arg.Reason}";AddLog(1, msg);UpdateStatus(client.IsConnected);return Task.FromResult(arg);}private Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg){//客户端连接事件string msg = $"客户端ID 连接,结果:{arg.ConnectResult.ResultCode}";AddLog(0, msg);UpdateStatus(client.IsConnected);return Task.FromResult(arg);}private Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg){//客户端取消订阅事件string msg = $"消息ClientId:[{arg.ClientId}] , topic:[{arg.ApplicationMessage.Topic}], 内容:[{arg.ApplicationMessage.ConvertPayloadToString()}],QoS:[{arg.ApplicationMessage.QualityOfServiceLevel}]";AddLog(1, msg);return Task.FromResult(arg);}private void ck_UserPwd_CheckedChanged(object sender, EventArgs e){txtUserName.Enabled = txtPwd.Enabled = ck_UserPwd.Checked;}void AddLog(int cate, string msg){this.Invoke(new Action(() =>{ListViewItem item = new ListViewItem();item.ImageIndex = cate;item.SubItems.Add(DateTime.Now.ToString("HH:mm:ss"));item.SubItems.Add(msg);lstview_Note.Items.Add(item);lstview_Note.EnsureVisible(lstview_Note.Items.Count - 1);}));}private async void btnSubscribed_Click(object sender, EventArgs e){if (string.IsNullOrWhiteSpace(txtTopic.Text)){MessageBox.Show("主题不能为空!");return;}//主题订阅if (client?.IsConnected == true){//QoS0 (Almost One):至多一次,只发送一次,会发生消息丢失或重复。//QoS1(Atleast Once): 至少一次,确保消息到达,但消息可能重复发送//QoS2(Exactly Once):只有一次:只有一次,确保消息只到达一次。var result = await client.SubscribeAsync(new MqttTopicFilter(){QualityOfServiceLevel = (MqttQualityOfServiceLevel)Enum.Parse(typeof(MqttQualityOfServiceLevel), comboQoS.Text.Trim()),Topic = txtTopic.Text.Trim()});StringBuilder sb = new StringBuilder();foreach (var item in result.Items){sb.AppendLine($"订阅主题:[{item.TopicFilter.Topic}],QoS:[{item.TopicFilter.QualityOfServiceLevel}],ReturnCode:[{item.ResultCode}]");}AddLog(0, sb.ToString());}else{MessageBox.Show("请先连接服务!");}}private async void btnPublish_Click(object sender, EventArgs e){if (string.IsNullOrWhiteSpace(txtMsgTopic.Text.Trim())){MessageBox.Show("主题不能为空!");return;}//发布信息if (client?.IsConnected == true){var result = await client.PublishAsync(new MqttApplicationMessage{Topic = txtMsgTopic.Text.Trim(),QualityOfServiceLevel = MqttQualityOfServiceLevel.ExactlyOnce,Payload =Encoding.UTF8.GetBytes( txtMsgContent.Text.Trim())});string msg = $"发布消息:{txtMsgContent.Text} Topic:{txtMsgTopic.Text.Trim()} 结果:{result.ReasonCode}";AddLog(0, msg);}else{MessageBox.Show("请先连接服务!");}}void UpdateStatus(bool isConnected){this.Invoke(new Action(() =>{if (!isConnected){btnConnect.Text = "连接";btnConnect.BackColor = Color.Red;comboIp_List.Enabled = txtPort.Enabled = true;}else{btnConnect.Text = "断开";btnConnect.BackColor = Color.Green;comboIp_List.Enabled = txtPort.Enabled = false;}}));}private async void btnUnsubscribed_Click(object sender, EventArgs e){if (string.IsNullOrWhiteSpace(txtTopic.Text)){MessageBox.Show("主题不能为空!");return;}//主题订阅if (client?.IsConnected == true){//QoS0 (Almost One):至多一次,只发送一次,会发生消息丢失或重复。//QoS1(Atleast Once): 至少一次,确保消息到达,但消息可能重复发送//QoS2(Exactly Once):只有一次:只有一次,确保消息只到达一次。var result = await client.UnsubscribeAsync(txtTopic.Text.Trim());StringBuilder sb = new StringBuilder();foreach (var item in result.Items){sb.AppendLine($"取消主题:[{item.TopicFilter}]订阅,ReturnCode:[{item.ReasonCode}]");}AddLog(0, sb.ToString());}else{MessageBox.Show("请先连接服务!");}}}

demo链接

https://download.csdn.net/download/lingxiao16888/89616009

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 解决idea debug/run 启动项目一闪而过的问题
  • Docker 设置代理
  • vscode+linux+opencv环境配置
  • 使用ollama取代openai的api进行graphRAG失败记录
  • 《Milvus Cloud向量数据库指南》—Milvus Cloud赋能Ivy.ai:解锁大数据潜力,加速AI创新
  • 低代码: 系统开发准备之确定一般开发流程,需求分析,复杂度分析,标准开发流程
  • C#初级——字典Dictionary
  • 【深度学习】什么是混合精度训练?缩放因子 S 的选择
  • 探索编程世界:大学新生的最佳入门路径与学习方法
  • 字节序大小端
  • 无人机之导航系统篇
  • 单片机如何使用超声波传感器进行距离测量
  • 电子合同怎么制作?9款常用电子合同软件
  • Python爬虫新手指南及简单实战
  • 【大模型从入门到精通5】openAI API高级内容审核-1
  • Angular数据绑定机制
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • JS函数式编程 数组部分风格 ES6版
  • MySQL主从复制读写分离及奇怪的问题
  • Vue小说阅读器(仿追书神器)
  • 阿里云ubuntu14.04 Nginx反向代理Nodejs
  • 从0实现一个tiny react(三)生命周期
  • 排序(1):冒泡排序
  • 腾讯视频格式如何转换成mp4 将下载的qlv文件转换成mp4的方法
  • 吴恩达Deep Learning课程练习题参考答案——R语言版
  • 验证码识别技术——15分钟带你突破各种复杂不定长验证码
  • 进程与线程(三)——进程/线程间通信
  • ![CDATA[ ]] 是什么东东
  • #设计模式#4.6 Flyweight(享元) 对象结构型模式
  • $.ajax()参数及用法
  • $GOPATH/go.mod exists but should not goland
  • (Java岗)秋招打卡!一本学历拿下美团、阿里、快手、米哈游offer
  • (MTK)java文件添加简单接口并配置相应的SELinux avc 权限笔记2
  • (NSDate) 时间 (time )比较
  • (定时器/计数器)中断系统(详解与使用)
  • (读书笔记)Javascript高级程序设计---ECMAScript基础
  • (二)什么是Vite——Vite 和 Webpack 区别(冷启动)
  • (附源码)apringboot计算机专业大学生就业指南 毕业设计061355
  • (十二)springboot实战——SSE服务推送事件案例实现
  • (转)AS3正则:元子符,元序列,标志,数量表达符
  • .Net Core 笔试1
  • .NET 设计一套高性能的弱事件机制
  • .net 无限分类
  • .net(C#)中String.Format如何使用
  • .NET单元测试
  • .NET的微型Web框架 Nancy
  • /tmp目录下出现system-private文件夹解决方法
  • @Autowired和@Resource装配
  • @RequestParam,@RequestBody和@PathVariable 区别
  • [ MSF使用实例 ] 利用永恒之蓝(MS17-010)漏洞导致windows靶机蓝屏并获取靶机权限
  • [AIGC] Kong:一个强大的 API 网关和服务平台
  • [Android]常见的数据传递方式
  • [C/C++]数据结构 循环队列
  • [C++]AVL树怎么转
  • [c++刷题]贪心算法.N01