当前位置: 首页 > news >正文

EMQX Cloud全托管的 MQTT 消息云服务

从前,我向往远方,喜欢到处旅游🚶🚶🚶;后来,我追忆往事,最爱故地重游。每到一处,勾起故人往事,总能被人生的沧桑感所震撼。

文章目录

    • 前言
    • 1. 物联网消息收发模型
      • 1.1 双向通信
      • 1.2 数据采集
      • 1.3 混合模型
    • 2. 服务部署
      • 2.1 新建部署
      • 2.2 添加认证
    • 3. 编码实践
      • 3.1 编写代码
      • 3.2 测试
    • 4. 在线调试

完整代码已上传Gitee

前言

MQTT协议是一种消息列队传输协议,采用订阅、发布机制,订阅者只接收自己已经订阅的数据,非订阅数据则不接收,既保证了必要的数据的交换,又避免了无效数据造成的储存与处理。因此在工业物联网中得到广泛的应用。

EMQX Cloud 是 EMQ 公司推出的一款面向物联网领域的 MQTT 消息中间件产品。作为全球首个全托管的 MQTT 5.0 公有云服务,EMQX Cloud 提供了一站式运维代管、独有隔离环境的 MQTT 消息服务。在万物互联的时代,EMQX Cloud 可以帮助您快速构建面向物联网领域的行业应用,轻松实现物联网数据的采集、传输、计算和持久化。

在这里插入图片描述

1. 物联网消息收发模型

1.1 双向通信

EMQX Cloud 支持海量设备及应用端连接,为应用程序及物联网设备提供安全可靠的双向通信能力:

在这里插入图片描述

在该模型中,EMQX Cloud 提供的 MQTT 服务将海量设备与应用连接起来,支持应用与设备间的双向通信,也支持设备与设备间的双向通信。该模型适用于有类即时通讯需求的物联网应用,比较典型的如:智能家居场景中,手机 APP 获取智能设备的状态信息,并且用户可以通过 APP 向智能设备发送控制指令。又如在工业场景中,AGV 机器人之间通过 MQTT 协议来进行即时通信,实现多机协作。EMQX Cloud 提供的 MQTT 服务不仅支持标准 MQTT 协议,也支持 MQTT over WebSocket,以及 CoAP、 MQTT-SN、LwM2M、JT/T808等协议,只需一个消息中间件即可满足多类终端同时接入的需求。

1.2 数据采集

EMQX Cloud 支持设备数据上云,通过海量 Topic 及数据集成的支持,低代码即可实现数据的采集、过滤、转换、计算及持久化。

在这里插入图片描述

在该模型中,EMQX Cloud 提供的 MQTT 服务可以实现数据的采集、计算和持久化。该模型适用于有数据采集和持久化需求的物联网应用,比较典型的如:在工业场景中,各个物联网传感器将实时采集的数据汇集到边缘网关,通过边缘网关将数据上传到 MQTT 服务器上,再由数据集成触发数据的过滤、转换和简单计算,并将最终结果转发至其他服务或持久化至目标数据库中。EMQX Cloud 提供了多种接入方案,涵盖了不同的网络条件、各种类型终端设备和边缘网关设备,支持70多种工业协议接入。

1.3 混合模型

EMQX Cloud 提供的 MQTT 服务支持双向通信和数据采集模型的混合应用。通过共享订阅、数据集成等能力,实现数据在物与物、物与应用间流转的同时进行持久化。

在这里插入图片描述

在该模型中,EMQX Cloud 提供的 MQTT 服务不仅为设备与设备、设备与应用间架起桥梁,同时可将需要的数据进行持久化,以便非实时应用在后续对获取的数据加以利用。比较典型的如一些人工智能应用,终端获取的数据需要发送至云端,通过云端运行的计算模型经过计算后即时反馈给终端,如物品或人脸识别应用。同时数据的副本需要持久化到数据库中,以便于后续离线训练和改进人工智能计算模型。

2. 服务部署

2.1 新建部署

首先在平台注册账号,新用户可以试用14天

在这里插入图片描述

新建部署:

在这里插入图片描述

在这里插入图片描述

新建项目,将部署服务以组的形式进行管理

在这里插入图片描述

新建项目完成后,可以将上边部署服务移动到所属项目下,方便管理。

2.2 添加认证

在这里插入图片描述

添加账号密码,连接认证的时候需要用到。

管理控制台:

在这里插入图片描述

3. 编码实践

案例采用Java整合

3.1 编写代码

pom

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
     <!--EMQX依赖-->
    <dependency>
        <groupId>org.eclipse.paho</groupId>
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
        <version>1.2.5</version>
    </dependency>
</dependencies>

启动类:

/**
 * @description: EMQX CLOUD-DEMO
 * @author: yh
 * @date: 2022/9/10
 */
@SpringBootApplication
public class ExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class,args);
    }
}

发布订阅:

/**
 * CMQX CLOUD 发布订阅
 *
 * @author: yh
 * @date: 2022/9/10
 */
@RequestMapping(value = "/MqttClient")
@RestController
public class MqttSample {
    MqttClient client = null;
    // 发布、订阅主题
    String topic = "test/topic";
    // 消息内容
    String content = "Hello World EMQ";
    // qos消息的服务质量可选值:0 1 2
    int qos = 2;
    // EMQ 部署控制台的连接地址
    String broker = "tcp://q6dec0f4.cn-shenzhen.emqx.cloud:11578";
    String clientId = MqttClient.generateClientId();

    public MqttSample() {
        //  持久化
        MemoryPersistence persistence = new MemoryPersistence();
        // MQTT 连接选项
        MqttConnectOptions connOpts = new MqttConnectOptions();
        // 设置认证信息,配置的账号 密码
        connOpts.setUserName("exqcloud");
        connOpts.setPassword("hello".toCharArray());
        try {
            client = new MqttClient(broker, clientId, persistence);
            // 设置回调
            client.setCallback(new SampleCallback());
            // 建立连接
            System.out.println("Connecting to broker: " + broker);
            client.connect(connOpts);
            System.out.println("Connected to broker: " + broker);
            // 订阅 topic
            client.subscribe(topic, qos);
            System.out.println("Subscribed to topic: " + topic);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 消息发布
     *
     * @author: yh
     * @date: 2022/9/10
     */
    @RequestMapping(value = "/send")
    public void send() {
        try {
            // 发布消息
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            //向服务器上的topic发布消息
            client.publish(topic, message);
            System.out.println("Message published");
            // 断开连接
            //client.disconnect();
            //System.out.println("Disconnected");
            // 关闭客户端
            //client.close();
            //System.exit(0);
        } catch (MqttException me) {
            System.out.println("reason " + me.getReasonCode());
            System.out.println("msg " + me.getMessage());
            System.out.println("loc " + me.getLocalizedMessage());
            System.out.println("cause " + me.getCause());
            System.out.println("excep " + me);
            me.printStackTrace();
        }
    }
}

回调类:

/**
 * 回调类
 * @author:  yh
 * @date:  2022/9/10
 */
public class SampleCallback implements MqttCallback {
    /**
     * 连接丢失
     * @author:  yh
     * @date:  2022/9/10
     */
    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("连接断开:" + cause.getMessage());
    }

    /**
     * 收到消息
     * @author:  yh
     * @date:  2022/9/10
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        System.out.println("接收到消息-- topic:" + topic + ",Qos:" + message.getQos() + ", 内容:" + new String(message.getPayload()));
    }

    /**
     * 消息传递成功
     * @author:  yh
     * @date:  2022/9/10
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("消息发送成功!");
    }
}

3.2 测试

启动程序

在这里插入图片描述

在这里插入图片描述

控制台连接数有点延迟,没出来的话多等几秒看看。

接下来就开始:编程第一步 Hello World,访问 http://127.0.0.1:8080/MqttClient/send

在这里插入图片描述

通过控制台可以看到,消息被成功发布、消费,多发几次消息看看

在这里插入图片描述

稍等一段时间后,就可以在 部署→指标 中看到相关的数据统计

在这里插入图片描述

我这里连续着发送了10条消息

在这里插入图片描述

4. 在线调试

有时候消息的发布不是由我们自己来发布,我们只负责消费。这种场景下在开发阶段模拟一个发布者是非常必要的,通过控制台的在线调试功能就可以直接发布消息方便调试。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

发送内容:

{"key":"hello","value":"你好"}

在这里插入图片描述

多试几次:

在这里插入图片描述

在这里插入图片描述


推荐一个项目 Spring整合常用组件

到此,本章内容就介绍完啦,如果有帮助到你 欢迎点个赞👍👍👍吧!!您的鼓励是博主的最大动力! 有问题评论区交流。

相关文章:

  • 【软考 系统架构设计师】操作系统① 操作系统概述
  • ARC113E Rvom and Rsrev
  • Windows与网络基础-26-IP地址概述
  • 模拟用户登录功能的实现以及演示SQL注入现象
  • 天龙八部科举答题问题和答案(全3/8)
  • CH342芯片应用—硬件设计指南
  • 【Android】-- 如何使用按钮和图片(点击事件、长按点击、同时展示文本和图像、ImageView)
  • 什么是文件格式的幻数
  • 【数据结构】绪论
  • C++的4种管理数据内存的方式
  • 中秋节的月亮怎么拍?不用手机和相机,程序员照样能拍出大片的感觉
  • Windows性能监控工具ypeperf
  • Python基础语法(二)—— 条件语句(if)+循环语句(for+while)
  • webpack基础使用
  • 基于蜜蜂算法求解电力系统经济调度(Matlab代码实现)
  • CSS3 聊天气泡框以及 inherit、currentColor 关键字
  • CSS进阶篇--用CSS开启硬件加速来提高网站性能
  • Java 网络编程(2):UDP 的使用
  • JavaScript 基本功--面试宝典
  • Javascript设计模式学习之Observer(观察者)模式
  • Java多态
  • Linux gpio口使用方法
  • Netty 4.1 源代码学习:线程模型
  • Spring核心 Bean的高级装配
  • supervisor 永不挂掉的进程 安装以及使用
  • vue总结
  • 使用common-codec进行md5加密
  • 使用putty远程连接linux
  • 要让cordova项目适配iphoneX + ios11.4,总共要几步?三步
  • 在weex里面使用chart图表
  • 阿里云IoT边缘计算助力企业零改造实现远程运维 ...
  • #include到底该写在哪
  • (4) PIVOT 和 UPIVOT 的使用
  • (libusb) usb口自动刷新
  • (NO.00004)iOS实现打砖块游戏(十二):伸缩自如,我是如意金箍棒(上)!
  • (附源码)计算机毕业设计ssm基于B_S的汽车售后服务管理系统
  • (算法二)滑动窗口
  • (原創) 人會胖會瘦,都是自我要求的結果 (日記)
  • *++p:p先自+,然后*p,最终为3 ++*p:先*p,即arr[0]=1,然后再++,最终为2 *p++:值为arr[0],即1,该语句执行完毕后,p指向arr[1]
  • .net core 客户端缓存、服务器端响应缓存、服务器内存缓存
  • .NET 服务 ServiceController
  • .NET开源项目介绍及资源推荐:数据持久层
  • .net使用excel的cells对象没有value方法——学习.net的Excel工作表问题
  • [2008][note]腔内级联拉曼发射的,二极管泵浦多频调Q laser——
  • [AI]文心一言出圈的同时,NLP处理下的ChatGPT-4.5最新资讯
  • [Android] Amazon 的 android 音视频开发文档
  • [AndroidStudio]_[初级]_[修改虚拟设备镜像文件的存放位置]
  • [Codeforces1137D]Cooperative Game
  • [Contiki系列论文之2]WSN的自适应通信架构
  • [C语言]——内存函数
  • [elastic 8.x]java客户端连接elasticsearch与操作索引与文档
  • [Leetcode] Permutations II
  • [leetcode]Clone Graph
  • [LeetCode]Max Points on a Line
  • [Linux内存管理-分页机制]—把一个虚拟地址转换为物理地址