当前位置: 首页 > news >正文

在Spring Boot项目中集成和使用MQTT

在物联网(IoT)应用中,MQTT(消息队列遥测传输)协议因其轻量级和高效性被广泛使用。在Spring Boot项目中,我们可以通过集成org.springframework.integration:spring-integration-mqtt依赖来实现对MQTT的支持。本文将逐步介绍如何在Spring Boot应用中使用MQTT。

1. 添加依赖

首先,我们需要在项目的pom.xml文件中添加Spring Integration MQTT的依赖:

<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Integration MQTT --><dependency><groupId>org.springframework.integration</groupId><artifactId>spring-integration-mqtt</artifactId></dependency><!-- MQTT Client Library (Paho) --><dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId></dependency>
</dependencies>

2. 配置MQTT

在Spring Boot应用的配置文件application.properties中添加MQTT相关配置:

mqtt.broker.url=tcp://localhost:1883
mqtt.client.id=spring-boot-mqtt-client
mqtt.username=your-username
mqtt.password=your-password
mqtt.default.topic=your/topic

3. 创建MQTT配置类

创建一个新的配置类,用于配置MQTT连接和消息处理:

import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.core.MessageProducer;
import org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory;
import org.springframework.integration.mqtt.core.MqttPahoClientFactory;
import org.springframework.integration.mqtt.inbound.MqttPahoMessageDrivenChannelAdapter;
import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
import org.springframework.integration.mqtt.support.DefaultPahoMessageConverter;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;@Configuration
public class MqttConfig {@Beanpublic MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();MqttConnectOptions options = new MqttConnectOptions();options.setServerURIs(new String[] { "tcp://localhost:1883" });options.setUserName("your-username");options.setPassword("your-password".toCharArray());factory.setConnectionOptions(options);return factory;}@Beanpublic MessageChannel mqttInputChannel() {return new DirectChannel();}@Beanpublic MessageProducer inbound() {MqttPahoMessageDrivenChannelAdapter adapter =new MqttPahoMessageDrivenChannelAdapter("spring-boot-mqtt-client-inbound",mqttClientFactory(), "your/topic");adapter.setCompletionTimeout(5000);adapter.setConverter(new DefaultPahoMessageConverter());adapter.setQos(1);adapter.setOutputChannel(mqttInputChannel());return adapter;}@Bean@ServiceActivator(inputChannel = "mqttInputChannel")public MessageHandler handler() {return message -> {String payload = (String) message.getPayload();System.out.println("Received message: " + payload);};}@Beanpublic MessageChannel mqttOutboundChannel() {return new DirectChannel();}@Bean@ServiceActivator(inputChannel = "mqttOutboundChannel")public MessageHandler mqttOutbound() {MqttPahoMessageHandler messageHandler =new MqttPahoMessageHandler("spring-boot-mqtt-client-outbound", mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic("your/topic");return messageHandler;}
}

4. 发送和接收消息

在你的服务或控制器中,可以使用如下方法发送消息:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Service;@Service
public class MqttMessageSender {@Autowiredprivate MessageChannel mqttOutboundChannel;public void sendMessage(String topic, String payload) {mqttOutboundChannel.send(MessageBuilder.withPayload(payload).setHeader("mqtt_topic", topic).build());}
}

要接收消息,可以配置handler方法中的处理逻辑,或将消息发送到另一个Spring Integration通道进行进一步处理。

5. 使用示例

在一个控制器中调用发送消息方法:

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;@RestController
public class MqttController {@Autowiredprivate MqttMessageSender mqttMessageSender;@GetMapping("/send")public String send(@RequestParam String topic, @RequestParam String message) {mqttMessageSender.sendMessage(topic, message);return "Message sent to topic " + topic;}
}

这样,你就可以通过HTTP请求发送MQTT消息了。例如,访问http://localhost:8080/send?topic=test/topic&message=Hello,将消息发送到MQTT主题test/topic

这就是一个完整的Spring Boot应用中集成MQTT的简单示例,希望对你有所帮助!


MQTT报文头介绍

MQTT协议的请求报文头非常轻量级。MQTT协议定义了固定报文头和可变报文头两部分。以下是各类报文的基本格式:

固定报文头

所有MQTT报文都有一个固定报文头,占据2-5个字节。固定报文头包含报文类型和一些控制标志。

固定报文头格式

  • 第一个字节:

    • 位7-4:报文类型(Message Type)
    • 位3-0:标志(Flags),根据报文类型不同而不同
  • 第二个字节及后续字节:

    • 剩余长度(Remaining Length),表示剩余报文的字节数。采用可变长度编码,每个字节的最高位用于指示是否有后续字节。
各类报文示例

连接报文(CONNECT)

连接报文用于客户端请求与服务器建立连接。其报文头如下:

  • 固定报文头:

    • 第一个字节:0x10(CONNECT报文类型是1,标志位为0000)
    • 第二个字节:剩余长度(根据可变部分长度而定)
  • 可变报文头:

    • 协议名(“MQTT”)
    • 协议级别(4,表示MQTT 3.1.1)
    • 连接标志(Connect Flags)
    • 保持连接时间(Keep Alive)
  • 有效载荷:

    • 客户端标识符(Client Identifier)
    • 用户名(可选)
    • 密码(可选)
    • 遗嘱主题(可选)
    • 遗嘱消息(可选)

连接确认报文(CONNACK)

服务器响应客户端的连接请求。其报文头如下:

  • 固定报文头:

    • 第一个字节:0x20(CONNACK报文类型是2,标志位为0000)
    • 第二个字节:剩余长度(2字节)
  • 可变报文头:

    • 连接确认标志(0x00或0x01)
    • 返回码(0表示连接成功,其他值表示错误)

发布报文(PUBLISH)

客户端或服务器发送消息到指定主题。其报文头如下:

  • 固定报文头:

    • 第一个字节:0x30(PUBLISH报文类型是3,标志位根据QoS等级、重复标志和保留标志变化)
    • 第二个字节:剩余长度(根据主题名、消息ID和消息体长度而定)
  • 可变报文头:

    • 主题名(Topic Name)
    • 消息ID(QoS等级为1或2时需要)
  • 有效载荷:

    • 消息内容

示例

以下是一个PUBLISH报文的示例:

30 0B                # 固定报文头 (PUBLISH,QoS 0)
00 05                # 主题名长度
74 6F 70 69 63       # 主题名 "topic"
68 65 6C 6C 6F       # 消息内容 "hello"

在这个示例中:

  • 第一个字节 0x30 表示这是一个PUBLISH报文,QoS等级为0,重复标志和保留标志为0。
  • 第二个字节 0x0B 表示剩余长度为11个字节。
  • 接下来的两个两个字节 0x00 0x05 表示主题名的长度为5个字节。
  • 接下来的5个字节 0x74 0x6F 0x70 0x69 0x63 表示主题名 “topic”。
  • 最后5个字节 0x68 0x65 0x6C 0x6C 0x6F 表示消息内容 “hello”。

这种结构使得MQTT报文非常紧凑和高效,特别适合物联网设备的通信。希望这篇文章能帮助你更好地理解和使用MQTT协议。

相关文章:

  • Qt 概述
  • 什么是SPI,和API有啥区别
  • RustGUI学习(iced/iced_aw)之扩展小部件(二十七):如何使用number_input部件?
  • 嵌入式0基础开始学习 ⅠC语言(2)运算符与表达式
  • 移动云以深度融合之服务,令“大”智慧贯穿云端
  • mysql IF语句,模糊检索
  • Python——二维字典
  • MybatisPlus中自定义sql
  • 【数据结构】二叉树的认识与实现
  • BGP策略实验(路径属性和选路规则)
  • C# 集合(六) —— 自定义集合Collection类
  • 音视频开发8 音视频中SDL的使用,SDL 在windows上环境搭建,SDL 使用 以及 常用 API说明,show YUV and play PCM
  • C++第十七弹---string使用(下)
  • 详细分析Element Plus中的ElMessageBox弹窗用法(附Demo及模版)
  • Java 三种主流的消息中间件 RabbitMQ、Kafka 和 RocketMQ 特点以及适用,使用场景 学习总结
  • 【译】React性能工程(下) -- 深入研究React性能调试
  • CentOS7简单部署NFS
  • CODING 缺陷管理功能正式开始公测
  • HTTP 简介
  • java B2B2C 源码多租户电子商城系统-Kafka基本使用介绍
  • Js实现点击查看全文(类似今日头条、知乎日报效果)
  • laravel 用artisan创建自己的模板
  • leetcode386. Lexicographical Numbers
  • log4j2输出到kafka
  • Promise面试题,控制异步流程
  • quasar-framework cnodejs社区
  • SegmentFault 社区上线小程序开发频道,助力小程序开发者生态
  • vue-cli3搭建项目
  • 基于web的全景—— Pannellum小试
  • 再次简单明了总结flex布局,一看就懂...
  • 大数据全解:定义、价值及挑战
  • 容器镜像
  • # Redis 入门到精通(一)数据类型(4)
  • #android不同版本废弃api,新api。
  • (26)4.7 字符函数和字符串函数
  • (LNMP) How To Install Linux, nginx, MySQL, PHP
  • (MIT博士)林达华老师-概率模型与计算机视觉”
  • (板子)A* astar算法,AcWing第k短路+八数码 带注释
  • (第9篇)大数据的的超级应用——数据挖掘-推荐系统
  • (附源码)ssm基于web技术的医务志愿者管理系统 毕业设计 100910
  • (十二)Flink Table API
  • (十五)devops持续集成开发——jenkins流水线构建策略配置及触发器的使用
  • (数据大屏)(Hadoop)基于SSM框架的学院校友管理系统的设计与实现+文档
  • (一)springboot2.7.6集成activit5.23.0之集成引擎
  • (游戏设计草稿) 《外卖员模拟器》 (3D 科幻 角色扮演 开放世界 AI VR)
  • (自适应手机端)响应式服装服饰外贸企业网站模板
  • ****Linux下Mysql的安装和配置
  • .bat批处理(三):变量声明、设置、拼接、截取
  • .net core + vue 搭建前后端分离的框架
  • .NET Core 和 .NET Framework 中的 MEF2
  • .NET Micro Framework初体验
  • .net MVC中使用angularJs刷新页面数据列表
  • .Net OpenCVSharp生成灰度图和二值图
  • .NET 的程序集加载上下文
  • .Net 基于IIS部署blazor webassembly或WebApi