当前位置: 首页 > news >正文

SpringBoot(Java)实现MQTT连接(本地Mosquitto)通讯调试

1.工作及使用背景

        工作中需要跟收集各种硬件或传感器数据用于Web展示及统计计算分析,如电表、流量计、泵、控制器等物联网设备。

        目前的思路及解决策略是使用力控或者杰控等组态软件实现数据的转储(也会涉及收费问题),通过组态软件自带的转储工具将数据转储到关系型数据库,如MySQL、sqlLite、Postgresql等。然后在BS架构后台程序中通过定时刷数据或者查询时计算的方式进行统计分析计算。

        但上述解决方案实际上是实现简单,但是数据统计时机有潜在的偏差风险,且逻辑设计非常别扭,数据库压力大等问题,理论上应该通过消息队列来接收实时数据参与计算的方式,Web系统只负责展示计算统计之后的结果,这样无论是时效还是数据准确性更容易保证,实时数据存储的数据库压力也不存在(可做数据校验用,也可不用),逻辑也不显别扭。

2.开发环境及工具

JDK1.8、maven、Mosquitto、IDEA、postman

3.框架结构及文件声明

因为我用的现成的框架,所以启动模块和业务模块分开了。实际开发调试中完全可以放一起也没关系。

MqttClientConnectorPool对外提供一个初始化的Mqtt客户端,在服务启动时初始化
MqttMsgSender对外提供一个可以执行消息发送的方法
MqttMsgSubscriber初始化一个Mqtt客户端,并根据配置订阅topic
TestController接收web请求的调用消息发送,用于测试
BusinessApplicationStartup服务启动时执行,调用MqttClientConnectorPool初始化一个客户端并调起MqttMsgSubscriber的监听等待
BusinessApplicationShutdown服务正常终止时调用,关闭服务启动默认创建的Mqtt客户端
MqttBrokerServerSpringBoot服务启动类

4.具体实现逻辑及代码

4.1 maven依赖

<properties><MQTTv3.version>1.2.5</MQTTv3.version>
</properties><dependencyManagement><dependencies><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>${MQTTv3.version}</version></dependency></dependencies>
</dependencyManagement>或者直接使用
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.MQTTv3</artifactId><version>1.2.5</version>
</dependency>

4.2 MqttClientConnectorPool

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttClientConnectorPool {public static MqttClient mqttClient;/*** 连接MQTT客户端* @return 获取MQTT连队对象*/public static MqttClient connectMQTT() {if (mqttClient != null){log.info("已存在,我深深的脑海!");return mqttClient;}try {// broker及连接信息String broker = "tcp://127.0.0.1:1883";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";//创建MQTT客户端(指定broker、客户端id、消息持久策略)mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());//创建连接参数配置MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);//连接超时时间options.setKeepAliveInterval(20);//是否自动重连options.setAutomaticReconnect(true);mqttClient.connect(options);log.info("MqttClient 服务启动broker初始化!");} catch (MqttException e){log.error("MqttClient connect Error:{}", e.getMessage());e.printStackTrace();}return mqttClient;}/*** 关闭MQTT客户端* @param client client*/public static void closeClient(MqttClient client){try {// 断开连接client.disconnect();// 关闭客户端client.close();} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}/*** 关闭MQTT客户端*/public static void closeStaticClient(){try {if (mqttClient != null){// 断开连接mqttClient.disconnect();// 关闭客户端mqttClient.close();}} catch (MqttException e){log.error("MqttClient disconnect or close Error:{}", e.getMessage());e.printStackTrace();}}
}

4.3 MqttMsgSender

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;@Slf4j
public class MqttMsgSender {public void sendMessage(MqttClient client,String topic,String content,int qos){MqttMessage message = new MqttMessage(content.getBytes());message.setQos(qos);try{client.publish(topic,message);} catch (MqttException e){log.error("MqttClient publish text info Error:{}!", e.getMessage());e.printStackTrace();}}
}

4.4 MqttMsgSubscriber

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;@Slf4j
public class MqttMsgSubscriber {String broker = "tcp://127.0.0.1:1883";String topic = "/deviceUp";String username = "admin";String password = "123456";String clientId = System.currentTimeMillis() + "";int qos = 1;public void readSubscribeTopicMessage(){try {MqttClient client = new MqttClient(broker, clientId, new MemoryPersistence());// 连接参数MqttConnectOptions options = new MqttConnectOptions();options.setUserName(username);options.setPassword(password.toCharArray());//是否清除会话options.setCleanSession(true);options.setConnectionTimeout(60);options.setKeepAliveInterval(60);client.setCallback(new MqttCallback() {@Overridepublic void connectionLost(Throwable throwable) {log.error("连接丢失");}@Overridepublic void messageArrived(String s, MqttMessage mqttMessage) throws Exception {log.info("topic为: " + topic);log.info("qos为: " + mqttMessage.getQos());log.info("消息内容为: " + new String(mqttMessage.getPayload()));}@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {// 当消息被完全传送出去后调用log.info("交付完成 ---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}});client.connect(options);client.subscribe(topic, qos);} catch (MqttException e){log.error("MqttMsgSubscriber 连接启动异常:{}", e.getMessage());} catch (Exception e){log.error("MqttMsgSubscriber 读取消息异常:{}", e.getMessage());}}}

4.5 TestController

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.web.bind.annotation.*;import java.util.List;@Slf4j
@RestController
@RequestMapping()
public class TestController {@GetMapping("/test/mqtt/{msg}")public String testSendMqttMsg(@PathVariable("msg") String msg){log.info("消息内容:{}.", msg);MqttClient mqttClient = MqttClientConnectorPool.connectMQTT();MqttMsgSender sender = new MqttMsgSender();String content = "{" + " \"deviceNo\": \"" + msg + "\"," + " \"val\": 232.5" + "}";String topic = "/deviceUp";int qos = 1;if (null != mqttClient){sender.sendMessage(mqttClient, topic, content, qos);} else {log.info("MqttClient为空,无法发送!");return "失败!";}return "成功!";}}

4.6 BusinessApplicationStartup

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import 包路径(可以删掉这一行手动导入).MqttMsgSubscriber;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;@Slf4j
@Order(10)
@Component
public class BusinessApplicationStartup implements ApplicationRunner {@Overridepublic void run(ApplicationArguments args) throws Exception {log.info("MqttClientConnectorPool ===================== Startup");MqttClientConnectorPool.connectMQTT();log.info("MqttClientConnectorPool ===================== recoveryAllJob Over !");log.info("MqttMsgSubscriber ===================== Startup");// 先订阅等待MqttMsgSubscriber subscriber = new MqttMsgSubscriber();subscriber.readSubscribeTopicMessage();}
}

4.7 BusinessApplicationShutdown

import 包路径(可以删掉这一行手动导入).MqttClientConnectorPool;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.stereotype.Component;@Slf4j
@Component
public class BusinessApplicationShutdown implements ApplicationListener<ContextClosedEvent> {@Overridepublic void onApplicationEvent(ContextClosedEvent contextClosedEvent) {log.info("服务终止! shutdown hook, ContextClosedEvent");MqttClientConnectorPool.closeStaticClient();}}

4.8 MqttBrokerServer

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;@EnableScheduling
@SpringBootApplication
public class MqttBrokerServer {public static void main(String[] args) {SpringApplication.run(MqttBrokerServer.class, args);}}

5.其他备注

5.1 需要Mqtt(Broker)服务器

        如果是直接使用示例代码的Mqtt服务器(Broker)配置,需要在自己电脑上安装Mqtt服务器,如mosquitto、EMQX等,具体自行搜索,或者使用公用的Mqtt服务器(我没测试试过

// 📢注意,当前Broker本人未测试
String broker = "tcp://broker.emqx.io:1883";
String topic = "mqtt/test";
String username = "emqx";
String password = "public";

5.2 调试地址

如果配置文件没配置[server.servlet.context-path],就不需要我自己/backend这一段

6.参考文章

MQTT协议介绍及Java教程

https://baijiahao.baidu.com/s?id=1801542244354727565&wfr=spider&for=pc

7.喜欢作者

暂无

相关文章:

  • Leetcode 11.乘最多水的容器(字节,快手面试题)
  • 【Spring基础3】- Spring的入门程序
  • 【python进阶攻略13】协程、内存copy、多进程
  • AI大模型面试大纲
  • Flutter中使用FFI的方式链接C/C++的so库(harmonyos)
  • 万象奥科工业平板上线,邀您体验与众不同!
  • 聊一下数据脱敏
  • 【机器学习(五)】分类和回归任务-AdaBoost算法
  • webpack 4 的 30 个步骤构建 react 开发环境
  • .NET CORE程序发布IIS后报错误 500.19
  • 嵌入式必懂微控制器选型:STM32、ESP32、AVR与PIC的比较分析
  • 银河麒麟,apt 安装软件报错640Unknown Status
  • JUC高并发编程5:多线程锁
  • 滚雪球学Oracle[7.1讲]:Oracle云数据库
  • Android Studio | 无法识别Icons.Default.Spa中的Spa
  • 【许晓笛】 EOS 智能合约案例解析(3)
  • Android 控件背景颜色处理
  • CSS 三角实现
  • java架构面试锦集:开源框架+并发+数据结构+大企必备面试题
  • Java精华积累:初学者都应该搞懂的问题
  • JS进阶 - JS 、JS-Web-API与DOM、BOM
  • JS题目及答案整理
  • js正则,这点儿就够用了
  • mockjs让前端开发独立于后端
  • nginx(二):进阶配置介绍--rewrite用法,压缩,https虚拟主机等
  • PHP 7 修改了什么呢 -- 2
  • PHP那些事儿
  • sublime配置文件
  • vue-cli3搭建项目
  • 分享几个不错的工具
  • 基于MaxCompute打造轻盈的人人车移动端数据平台
  • - 语言经验 - 《c++的高性能内存管理库tcmalloc和jemalloc》
  • 怎样选择前端框架
  • 走向全栈之MongoDB的使用
  • 《TCP IP 详解卷1:协议》阅读笔记 - 第六章
  • ​LeetCode解法汇总2808. 使循环数组所有元素相等的最少秒数
  • ​ssh免密码登录设置及问题总结
  • #Java第九次作业--输入输出流和文件操作
  • #NOIP 2014# day.2 T2 寻找道路
  • #预处理和函数的对比以及条件编译
  • $refs 、$nextTic、动态组件、name的使用
  • (C++)栈的链式存储结构(出栈、入栈、判空、遍历、销毁)(数据结构与算法)
  • (PWM呼吸灯)合泰开发板HT66F2390-----点灯大师
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (附源码)计算机毕业设计SSM基于java的云顶博客系统
  • (考研湖科大教书匠计算机网络)第一章概述-第五节1:计算机网络体系结构之分层思想和举例
  • (十一)手动添加用户和文件的特殊权限
  • (四)Linux Shell编程——输入输出重定向
  • (四)软件性能测试
  • (一)基于IDEA的JAVA基础1
  • (转)h264中avc和flv数据的解析
  • .net 7和core版 SignalR
  • .NET MVC 验证码
  • .net websocket 获取http登录的用户_如何解密浏览器的登录密码?获取浏览器内用户信息?...
  • .net2005怎么读string形的xml,不是xml文件。