当前位置: 首页 > news >正文

Java使用MQTT协议

MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议)是一种轻量级的、基于发布/订阅模式的物联网通信协议。它构建于TCP/IP协议之上,由IBM在1999年发布。MQTT的主要特点包括:

  • 轻量级与高效:MQTT设计用于在带宽有限、网络不稳定的环境中工作,具有较小的数据包开销和较低的带宽占用。
  • 高可靠性:使用TCP协议传输,确保消息传递的可靠性。
  • 发布/订阅模式:支持一对多的消息发布,降低应用程序之间的耦合度。
  • 广泛适用性:广泛应用于物联网、智能家居、小型设备等领域,特别适用于机器与机器(M2M)通信。

MQTT协议通过简单的发布和订阅机制,实现了消息的可靠传输和分发,是物联网领域中的重要通信协议之一。

1、需要了解的理论知识

1.1、MQTT 的工作原理

要了解 MQTT 的工作原理,首先需要掌握以下几个概念:MQTT 客户端、MQTT Broker、发布-订阅模式、主题、QoS。

MQTT 客户端

任何运行 MQTT 客户端库的应用或设备都是 MQTT 客户端。例如,使用 MQTT 的即时通讯应用是客户端,使用 MQTT 上报数据的各种传感器是客户端,各种 MQTT 测试工具也是客户端。

MQTT Broker

MQTT Broker 是负责处理客户端请求的关键组件,包括建立连接、断开连接、订阅和取消订阅等操作,同时还负责消息的转发。一个高效强大的 MQTT Broker 能够轻松应对海量连接和百万级消息吞吐量,从而帮助物联网服务提供商专注于业务发展,快速构建可靠的 MQTT 应用。

发布-订阅模式

发布-订阅模式与客户端-服务器模式的不同之处在于,它将发送消息的客户端(发布者)和接收消息的客户端(订阅者)进行了解耦。发布者和订阅者之间无需建立直接连接,而是通过 MQTT Broker 来负责消息的路由和分发。

主题

MQTT 协议根据主题来转发消息。主题通过 / 来区分层级,类似于 URL 路径,例如:test/topic

1.2、MQTT 的工作流程

在了解了 MQTT 的基本组件之后,让我们来看看它的一般工作流程:

  1. 客户端使用 TCP/IP 协议与 Broker 建立连接,可以选择使用 TLS/SSL 加密来实现安全通信。客户端提供认证信息,并指定会话类型(Clean Session 或 Persistent Session)。
  2. 客户端既可以向特定主题发布消息,也可以订阅主题以接收消息。当客户端发布消息时,它会将消息发送给 MQTT Broker;而当客户端订阅消息时,它会接收与订阅主题相关的消息。
  3. MQTT Broker 接收发布的消息,并将这些消息转发给订阅了对应主题的客户端。它根据 QoS 等级确保消息可靠传递,并根据会话类型为断开连接的客户端存储消息。

2、代码实现

2.1、Maven依赖

MQTT协议有两个版本,一个是3.x,另一个是5.x。本文使用的是3.x

MQTT v3.1

<!--  MQTT v3.1 -->
<dependency><groupId>org.eclipse.paho</groupId><artifactId>org.eclipse.paho.client.mqttv3</artifactId><version>1.2.5</version>
</dependency><!--   MQTT 5.0     https://mvnrepository.com/artifact/org.eclipse.paho/org.eclipse.paho.mqttv5.client -->
<!--		<dependency>-->
<!--			<groupId>org.eclipse.paho</groupId>-->
<!--			<artifactId>org.eclipse.paho.mqttv5.client</artifactId>-->
<!--			<version>1.2.5</version>-->
<!--		</dependency>-->

2.2、配置文件

broker这里使用免费的公共的服务,也可以自己使用开源项目emqx搭建

# httpserver.port=8091#server.servlet.context-path=/hub# mqtt
mqtt.url=tcp://broker.emqx.io:1883
mqtt.username=
mqtt.password=
mqtt.clientId=java-mqtt-client
mqtt.defaultTopic=test/topic
mqtt.cleanSession=true

2.3、基于MQTT写个service

依赖了lombok简化代码,按需导入

<dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional>
</dependency>
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;@Service
@Slf4j
public class MqttService implements MqttCallback {@Value("${mqtt.url}")private String url;@Value("${mqtt.username}")private String username;@Value("${mqtt.password}")private String password;@Value("${mqtt.clientId}")private String clientId;@Value("${mqtt.cleanSession}")private boolean cleanSession;private MqttClient client;private int reconnectDelay = 2000; // 初始重连延迟2秒private int maxReconnectAttempts = 3; // 最大重连尝试次数private int reconnectAttempts = 0;@PostConstructpublic void connect() {try {client = new MqttClient(url, clientId);MqttConnectOptions options = new MqttConnectOptions();// cleanSession为 false 时表示创建一个持久会话,在客户端断开连接时,会话仍然保持并保存离线消息,直到会话超时注销。// cleanSession为 true 时表示创建一个新的临时会话,在客户端断开时,会话自动销毁。// 注意:持久会话恢复的前提是客户端使用固定的 Client ID 再次连接,如果 Client ID 是动态的,那么连接成功后将会创建一个新的持久会话。options.setCleanSession(cleanSession);// 禁用Paho的自动重连,自己控制options.setAutomaticReconnect(false);if (username != null && !username.isEmpty()) {options.setUserName(username);options.setPassword(password.toCharArray());}client.setCallback(this);client.connect(options);// 订阅一个或多个主题client.subscribe("test/topic");} catch (MqttException e){log.error("---mqtt connect fail", e);}}// 实现MqttCallback的方法:connectionLost, messageArrived, deliveryComplete@Overridepublic void connectionLost(Throwable cause) {log.info("---Connection lost! " + cause.getMessage());// 这里可以重新连接MQTT服务器reconnect();}private void reconnect() {if (reconnectAttempts < maxReconnectAttempts) {reconnectAttempts++;log.info("---Attempting to reconnect in " + reconnectDelay + "ms");// 使用ScheduledExecutorService方式实现延迟重连// 为简单起见,使用Thread.sleeptry {Thread.sleep(reconnectDelay);} catch (InterruptedException e) {Thread.currentThread().interrupt();}reconnectDelay *= 2; // 增大重连间隔connect(); // 尝试重新连接} else {log.warn("---Max reconnect attempts reached");// 可以考虑执行一些清理操作或通知操作}}@Overridepublic void messageArrived(String topic, MqttMessage message) throws Exception {// 当消息到达时调用log.info("---Message arrived. Topic: " + topic + " Message: " + new String(message.getPayload()));// 处理消息的逻辑}@Overridepublic void deliveryComplete(IMqttDeliveryToken token) {// 当消息被完全传送出去后调用log.info("---Delivery complete!");// 可以在这里处理一些发送完成后的清理工作}// 发送消息的方法public void publish(String topic, String payload) throws MqttException {MqttMessage message = new MqttMessage(payload.getBytes());// QoS 0,最多交付一次。可能丢失消息// QoS 1,至少交付一次。可以保证收到消息,但消息可能重复// QoS 2,只交付一次。可以保证消息既不丢失也不重复message.setQos(2);client.publish(topic, message);log.info("---Message published: {}", payload);}// 断开连接的方法@PreDestroypublic void disconnect() throws MqttException {if (client != null && client.isConnected()) {client.disconnect();log.info("---Disconnected");}}
}

2.4、测试MqttService

写个控制器测试

import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;@RestController
@RequestMapping("/mqtt")
public class MqttController {@Autowiredprivate MqttService mqttService;@RequestMapping(value = "/send", method = {RequestMethod.GET, RequestMethod.POST})public ResponseEntity<String> sendMessage(@RequestParam String topic, @RequestParam String message) {topic = "test/topic";try {mqttService.publish(topic, message);return ResponseEntity.ok("Message sent successfully");} catch (MqttException e) {return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR).body("Failed to send message: " + e.getMessage());}}
}

浏览器输入:http://localhost:8091/mqtt/send?topic&message=我来了
控制台日志如下:符合预期

2024-08-10 23:13:17 [MQTT Call: java-mqtt-client] INFO  cn.talktrip.mqtt.MqttService - ---Message arrived. Topic: test/topic Message: 我来了
2024-08-10 23:13:17 [http-nio-8091-exec-8] INFO  cn.talktrip.mqtt.MqttService - ---Message published: 我来了
2024-08-10 23:13:17 [MQTT Call: java-mqtt-client] INFO  cn.talktrip.mqtt.MqttService - ---Delivery complete!

参考文档:https://www.emqx.com/zh/blog/the-easiest-guide-to-getting-started-with-mqtt

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 哈希表 242.有效的字母异位词
  • 【leetcode图文详解】特殊数组II : 空间换时间的“记忆化”,越多越好吗?
  • 24暑假算法刷题 | Day30 | 贪心算法 IV | LeetCode 452. 用最少数量的箭引爆气球,435. 无重叠区间,763. 划分字母区间
  • 常用的麦克劳林级数展开式(泰勒展开式)
  • MapReduce 简单介绍
  • Vue3的多种组件通信方式
  • 解决C++读写中文乱码问题, UTF-8与GBK字符的转换 —基于Windows.h
  • RAR文件密码忘记怎么办?三大RAR密码找回工具推荐
  • 苹果macOS 15 Sequoia投屏功能 实现Mac上iPhone桌面管理
  • Windows下curl报错:curl: (3) unmatched close brace/bracket in URL position x
  • 【现代通信技术】走进现代通信系统架构
  • 海康相机二次开发学习笔记1-环境配置
  • 【2024】k8s集群 图文详细 部署安装使用(两万字)
  • Oracle笔记
  • 二叉树详解(1)
  • [分享]iOS开发 - 实现UITableView Plain SectionView和table不停留一起滑动
  • 【附node操作实例】redis简明入门系列—字符串类型
  • 【跃迁之路】【519天】程序员高效学习方法论探索系列(实验阶段276-2018.07.09)...
  • CEF与代理
  • codis proxy处理流程
  • in typeof instanceof ===这些运算符有什么作用
  • JavaScript中的对象个人分享
  • JavaSE小实践1:Java爬取斗图网站的所有表情包
  • java概述
  • Laravel5.4 Queues队列学习
  • leetcode378. Kth Smallest Element in a Sorted Matrix
  • Lucene解析 - 基本概念
  • Yeoman_Bower_Grunt
  • 番外篇1:在Windows环境下安装JDK
  • 三栏布局总结
  • 深度学习在携程攻略社区的应用
  • 使用权重正则化较少模型过拟合
  • 手写双向链表LinkedList的几个常用功能
  • 做一名精致的JavaScripter 01:JavaScript简介
  • Play Store发现SimBad恶意软件,1.5亿Android用户成受害者 ...
  • ​虚拟化系列介绍(十)
  • ### Error querying database. Cause: com.mysql.jdbc.exceptions.jdbc4.CommunicationsException
  • #stm32驱动外设模块总结w5500模块
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • ${factoryList }后面有空格不影响
  • (C#)一个最简单的链表类
  • (C语言)输入一个序列,判断是否为奇偶交叉数
  • (NO.00004)iOS实现打砖块游戏(十二):伸缩自如,我是如意金箍棒(上)!
  • (ZT)一个美国文科博士的YardLife
  • (zt)最盛行的警世狂言(爆笑)
  • (附源码)python旅游推荐系统 毕业设计 250623
  • (十) 初识 Docker file
  • (轉貼) UML中文FAQ (OO) (UML)
  • (最全解法)输入一个整数,输出该数二进制表示中1的个数。
  • .ai域名是什么后缀?
  • .NET Core WebAPI中封装Swagger配置
  • .net 发送邮件
  • .Net多线程总结
  • .NET开发不可不知、不可不用的辅助类(一)
  • .net企业级架构实战之7——Spring.net整合Asp.net mvc