当前位置: 首页 > news >正文

SpringBoot集成MQTT实现交互服务通信

引言

本文是springboot集成mqtt的一个实战案例。
gitee代码库地址:源码地址

一、什么是MQTT

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的“轻量级”通讯协议,该协议构建于 TCP/IP 协议上,由 IBM 于 1999 年发明。MQTT 协议的主要特征是开放、简单、轻量级和易于实现,这些特征使得它适用于受约束的应用环境,如:

网络受限:网络带宽较低且传输不可靠
终端受限:协议运行在嵌入式设备上,嵌入式终端的处理器、内存等是受限的

MQTT 非常适用于物联网领域,如传感器与服务器的通信、传感器信息采集等。

二、发布/订阅模式

发布/订阅模式(Publish/Subscribe Pattern,简称Pub/Sub)是一种消息通信模式,在这种模式下,消息的发送者(发布者)不会将消息直接发送给特定的接收者(订阅者)。而是将代表消息内容的通知(事件)发布到一个特定的主题或频道上,而订阅了这个主题的接收者会收到所有在这个主题上发布的通知。这种模式解耦了消息的发送者和接收者,使得系统更加灵活和可扩展。

主要组成部分

  1. 发布者(Publisher):负责生成消息并将其发布到特定的主题或频道。

  2. 订阅者(Subscriber):注册对特定主题的兴趣,并接收该主题上的所有消息。

  3. 消息代理(Message Broker):作为中间件,它接收来自发布者的消息,并将这些消息传递给所有相关的订阅者。

优点

  • 解耦:发布者和订阅者之间不需要直接交互,这降低了系统的耦合度。

  • 灵活性:可以动态添加或删除订阅者,不影响其他组件。

  • 可扩展性:系统容易扩展,可以轻松增加新的发布者或订阅者。

缺点

  • 复杂性:引入了额外的组件(如消息代理),增加了系统的复杂性和管理成本。

  • 性能开销:消息的传递需要通过中间件,可能会有延迟和性能损失。

应用场景

  • 事件驱动架构:在微服务架构中,不同的服务通过发布/订阅模式进行异步通信。

  • 数据流处理:如实时数据分析,多个组件可以订阅数据流并进行处理。

  • 分布式系统:用于跨系统或跨服务的消息传递。

发布/订阅模式并不是 MQTT 协议特有的模式,很多消息中间件都有使用发布/订阅模式,有同学可能认为这就是观察者模式,还真不是,这两个模式很容易混淆。观察者模式只有观察者 + 被观察者两个角色,而发布/订阅模式还有一个经纪人 Broker;往更深层次的讲观察者和被观察者,是松耦合的关系,而发布者和订阅者,则完全不存在耦合。

三、Windows下安装MQTT消息服务器

非常遗憾,EMQ X Broker 在 5.4.0 版本的发行版中已不支持 windows 版本的安装包了,笔者从网上找了一个最后支持版本的压缩包,已上传资源。

  • 解压后,在bin文件下,使用cmd执行运行命令 .\emqx console
  • 访问MQTT管理页面 http://localhost:18083/#/ 用户名密码 admin/public

如果报错缺少Erlang环境,需要自行安装下该环境

在这里插入图片描述
在这里插入图片描述

浏览器访问:http://localhost:18083/#,输入账号密码进入,会要求你修改密码,可以暂时跳过

在这里插入图片描述

四、Windows安装MQTT消息代理客户端MQTTX

下载地址:MQTTX下载地址

点击免费下载
在这里插入图片描述

选择64位版本

在这里插入图片描述
下好后点击安装,启动运行界面如下:
在这里插入图片描述
语言是英文,可以在设置按钮里调成中文。这个客户端代理主要是进行消息发送的测试服务。

五、新建MQTT集成项目

随便新建了一个springboot应用,用的是JDK17,在pom文件中引入如下依赖:

        <!-- MQTT --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version></dependency>

5.1 yml配置

server:port: 8081#允许循环依赖
spring:main:allow-circular-references: truecustomer:mqtt:broker: tcp://localhost:1883clientList:#发布客户端ID- clientId: nays_service#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/publish#用户名userName: admin#密码password: public#接收客户端ID- clientId: receive_service#监听主题 同时订阅多个主题 使用 - 分割开subscribeTopic: mqtt/receive#用户名userName: admin#密码password: public

5.2 Mqtt配置类

package com.hulei.mqttproject.config;import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;import java.util.List;/*** Mqtt配置类*/
@Data
@Configuration
@ConfigurationProperties(prefix = "customer.mqtt")
public class MqttConfig {/*** mqtt broker地址*/String broker;/*** 需要创建的MQTT客户端*/List<MqttClient> clientList;
}

5.3 MQTT客户端

package com.hulei.mqttproject.config;import lombok.Data;/*** MQTT客户端*/
@Data
public class MqttClient {/*** 客户端ID*/private String clientId;/*** 监听主题*/private String subscribeTopic;/*** 用户名*/private String userName;/*** 密码*/private String password;
}

5.4 MQTT客户端管理类

package com.hulei.mqttproject.config;import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** MQTT客户端管理类,如果客户端非常多后续可入redis缓存*/
@Slf4j
@Component
public class MqttClientManager {@Value("${customer.mqtt.broker}")private String mqttBroker;@Resourceprivate MqttCallBackContext mqttCallBackContext;/*** 存储MQTT客户端*/public static Map<String, MqttClient> MQTT_CLIENT_MAP = new ConcurrentHashMap<>();public MqttClient getMqttClientById(String clientId) {return MQTT_CLIENT_MAP.get(clientId);}/*** 创建mqtt客户端** @param clientId       客户端ID* @param subscribeTopic 订阅主题,可为空* @param userName       用户名,可为空* @param password       密码,可为空*/public void createMqttClient(String clientId, String subscribeTopic, String userName, String password) {MemoryPersistence persistence = new MemoryPersistence();try {MqttClient client = new MqttClient(mqttBroker, clientId, persistence);MqttConnectOptions connOpts = new MqttConnectOptions();if (null != userName && !userName.isEmpty()) {connOpts.setUserName(userName);}if (null != password && !password.isEmpty()) {connOpts.setPassword(password.toCharArray());}connOpts.setCleanSession(true);if (null != subscribeTopic && !subscribeTopic.isEmpty()) {AbsMqttCallBack callBack = mqttCallBackContext.getCallBack(clientId);if (null == callBack) {callBack = mqttCallBackContext.getCallBack("default");}callBack.setClientId(clientId);callBack.setConnectOptions(connOpts);client.setCallback(callBack);}//连接mqtt服务端brokerclient.connect(connOpts);// 订阅主题if (null != subscribeTopic && !subscribeTopic.isEmpty()) {if (subscribeTopic.contains("-"))client.subscribe(subscribeTopic.split("-"));else {client.subscribe(subscribeTopic);}}MQTT_CLIENT_MAP.putIfAbsent(clientId, client);} catch (MqttException e) {log.error("Create mqttClient failed!", e);}}
}

5.5 MQTT客户端创建

package com.hulei.mqttproject.config;import jakarta.annotation.PostConstruct;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.List;/*** MQTT客户端创建*/
@Component
@Slf4j
public class MqttClientCreate {@Resourceprivate MqttClientManager mqttClientManager;@Resourceprivate MqttConfig mqttConfig;/*** 创建MQTT客户端*/@PostConstructpublic void createMqttClient() {List<MqttClient> mqttClientList = mqttConfig.getClientList();for (MqttClient mqttClient : mqttClientList) {log.info("{}", mqttClient);//创建客户端,客户端ID:demo,回调类跟客户端ID一致mqttClientManager.createMqttClient(mqttClient.getClientId(), mqttClient.getSubscribeTopic(), mqttClient.getUserName(), mqttClient.getPassword());}}
}

5.6 MQTT回调抽象类

package com.hulei.mqttproject.config;import jakarta.annotation.Resource;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttMessage;/*** MQTT回调抽象类*/
@Setter
@Getter
@Slf4j
public abstract class AbsMqttCallBack implements MqttCallback {private String clientId;private MqttConnectOptions connectOptions;@ResourceMqttClientManager mqttClientManager;/*** 失去连接操作,进行重连** @param throwable 异常*/@Overridepublic void connectionLost(Throwable throwable) {try {if (null != clientId) {if (null != connectOptions) {mqttClientManager.getMqttClientById(clientId).connect(connectOptions);} else {mqttClientManager.getMqttClientById(clientId).connect();}}} catch (Exception e) {log.error("{} reconnect failed!", e.getMessage(), e);}}/*** 接收订阅消息* @param topic    主题* @param mqttMessage 接收消息* @throws Exception 异常*/@Overridepublic void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {String content = new String(mqttMessage.getPayload());handleReceiveMessage(topic, content);}/*** 消息发送成功** @param iMqttDeliveryToken toke*/@Overridepublic void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {log.info("消息发送成功");}/*** 处理接收的消息* @param topic   主题* @param message 消息内容*/protected abstract void handleReceiveMessage(String topic, String message);
}

5.7 MQTT订阅回调环境类

package com.hulei.mqttproject.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** MQTT订阅回调环境类*/
@Component
@Slf4j
public class MqttCallBackContext {private final Map<String, AbsMqttCallBack> callBackMap = new ConcurrentHashMap<>();/*** 默认构造函数** @param callBackMap 回调集合*/public MqttCallBackContext(Map<String, AbsMqttCallBack> callBackMap) {this.callBackMap.putAll(callBackMap);}/*** 获取MQTT回调类** @param clientId 客户端ID* @return MQTT回调类*/public AbsMqttCallBack getCallBack(String clientId) {return this.callBackMap.get(clientId);}
}

5.8 默认回调类

package com.hulei.mqttproject.config;import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;/*** 默认回调*/
@Slf4j
@Component("default")
public class DefaultMqttCallBack extends AbsMqttCallBack {/*** @param topic   主题* @param message 消息内容*/@Overrideprotected void handleReceiveMessage(String topic, String message) {log.info("接收到主题---{}", topic);log.info("接收到消息---{}", message);// 自定义消息处理业务}
}

六、测试服务类

package com.hulei.mqttproject.controller;import com.hulei.mqttproject.config.MqttClientManager;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;@RestController
@Slf4j
public class SendController {@Resourceprivate MqttClientManager mqttClientManager;@RequestMapping("/sendMessage")public String sendMessage(String topic){try {MqttMessage mqttMessage = new MqttMessage("你好".getBytes());mqttClientManager.getMqttClientById("nays_service").publish(topic,mqttMessage);return "发送成功";} catch (Exception e) {log.error("发送失败",e);return "发送失败";}}
}

七、启动springboot

启动日志可以看到,mqtt消息服务器连接成功

在这里插入图片描述
EMQX工具显示发布客户端和接收客户端均已成功注册

在这里插入图片描述

使用Apifox测试下SendController中的接口,mqtt/receive是yaml中接收客户端订阅的主题,当然也可以往mqtt/publish主题发,mqtt中消息的发布者也可以订阅主题,监听某些消息。

在这里插入图片描述

在这里插入图片描述

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • python 迭代器介绍 map() 函数
  • stm32入门-----EXTI外部中断(上 ——理论篇)
  • TDesign组件库日常应用的一些注意事项
  • 【Datawhale AI 夏令营2024--CV】深度学习入门
  • 泛微e-cology WorkflowServiceXml SQL注入漏洞(POC)
  • 基于嵌入式Linux的高性能车载娱乐系统设计与实现 —— 融合Qt、FFmpeg和CAN总线技术
  • 高职院校人工智能人才培养成果导向系统构建、实施要点与评量方法
  • Elasticsearch 角色和权限管理
  • 3.RabbitMQ安装-Centos7
  • 好用的AI搜索引擎
  • RISC-V在线反汇编工具
  • STM32 IAP 需要关注的一些事
  • 捷配总结的SMT工厂安全防静电规则
  • CSS3实现提示工具的渐入渐出效果及CSS3动画简介
  • k8s一些名词解释
  • [译] 理解数组在 PHP 内部的实现(给PHP开发者的PHP源码-第四部分)
  • [译]如何构建服务器端web组件,为何要构建?
  • “Material Design”设计规范在 ComponentOne For WinForm 的全新尝试!
  • Android开源项目规范总结
  • canvas 绘制双线技巧
  • download使用浅析
  • express + mock 让前后台并行开发
  • Laravel 中的一个后期静态绑定
  • leetcode46 Permutation 排列组合
  • pdf文件如何在线转换为jpg图片
  • Vue小说阅读器(仿追书神器)
  • Vue学习第二天
  • 第十八天-企业应用架构模式-基本模式
  • 浅析微信支付:申请退款、退款回调接口、查询退款
  • 全栈开发——Linux
  • 实现简单的正则表达式引擎
  • 一加3T解锁OEM、刷入TWRP、第三方ROM以及ROOT
  • 原生js练习题---第五课
  • JavaScript 新语法详解:Class 的私有属性与私有方法 ...
  • 蚂蚁金服CTO程立:真正的技术革命才刚刚开始
  • # Python csv、xlsx、json、二进制(MP3) 文件读写基本使用
  • #07【面试问题整理】嵌入式软件工程师
  • #我与虚拟机的故事#连载20:周志明虚拟机第 3 版:到底值不值得买?
  • (12)Linux 常见的三种进程状态
  • (52)只出现一次的数字III
  • (PyTorch)TCN和RNN/LSTM/GRU结合实现时间序列预测
  • (Spark3.2.0)Spark SQL 初探: 使用大数据分析2000万KF数据
  • (编译到47%失败)to be deleted
  • (分布式缓存)Redis分片集群
  • (附源码)计算机毕业设计SSM疫情下的学生出入管理系统
  • (学习日记)2024.02.29:UCOSIII第二节
  • (原创)Stanford Machine Learning (by Andrew NG) --- (week 9) Anomaly DetectionRecommender Systems...
  • (转)负载均衡,回话保持,cookie
  • .net 4.0发布后不能正常显示图片问题
  • .NET 4.0网络开发入门之旅-- 我在“网” 中央(下)
  • .net core 管理用户机密
  • .NET NPOI导出Excel详解
  • .net SqlSugarHelper
  • .Net 访问电子邮箱-LumiSoft.Net,好用
  • .NET/C# 使窗口永不获得焦点