当前位置: 首页 > news >正文

test_mqtt/python

#subscript端的功能
import csv
import os
import random
import timefrom paho.mqtt import client as mqtt_client
broker = '175.24.64.90'
port = 1883
# topic = "/v1_1/up/device_upload/EMS1"
topic = "/api/ems_cloud/v1_1/operate_time_command/{HZ01}"
# generate client ID with pub prefix rando
client_id = f'python-mqtt-{random.randint(1000, 2000)}'file = open("output.csv", 'w').close()  # 清空文件之前的内容def write_csv(mPayload, mTopic):# 创建文件夹和表头# os.makedirs(os.path.join('..', 'data'), exist_ok=True)  # 创建数据文件夹# data_file = os.path.join('..', 'data', 'output.csv')# with open(data_file, 'w', encoding='utf-8', newline='') as f:#    writer = csv.writer(f)#   writer.writerow(['payload', 'topic'])# 追加数据with open("output.csv", 'a', encoding='utf-8', newline='') as f:f.seek(0)writer = csv.writer(f)writer.writerow([mPayload, mTopic])
# publish端的功能
import random
import time
import datetime
from paho.mqtt import client as mqtt_clientbroker = '175.24.64.90'
port = 1883
topic = "/v1_1/down/remote_control/EMS1"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
pub_data = {"cmdid": 1024,                          # 命令id,便于边缘侧做命令反馈"ts": datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),                            # 秒级时间戳"identify": "simulator",                      # 受控设备identify"data": [{"identity": "cmd_bool",                # 受控点identity"value": 1,                      # 受控点写入值},{"identity": "cmd_ushort","value": 10,},{"identity": "cmd_float",  # 受控点identity"value": -11.98,  # 受控点写入值}]
}# msg = json.dumps(pub_data)
msg = str(pub_data)def connect_mqtt():def on_connect(client, userdata, flags, rc):if rc == 0:print("Connected to MQTT Broker!")else:print("Failed to connect, return code %d\n", rc)client = mqtt_client.Client(client_id)client.on_connect = on_connectclient.username_pw_set("emsuser","emsuser")client.connect(broker, port, 60)return clientdef publish(client):msg_count = 0while True:time.sleep(1)result = client.publish(topic, msg)# result: [0, 1]status = result[0]if status == 0:print(f"Send `{msg}` to topic `{topic}`")else:print(f"Failed to send message to topic {topic}")msg_count += 1def run():client = connect_mqtt()client.loop_start()publish(client)if __name__ == '__main__':run()

相关文章:

  • 北京网站建设多少钱?
  • 辽宁网页制作哪家好_网站建设
  • 高端品牌网站建设_汉中网站制作
  • 帝国CMS如何修改时间格式,变成几分钟,几小时教程
  • 常用 CSS 写法
  • 苹果MacOS系统使用微软远程桌面连接Windows电脑桌面详细步骤
  • Mac配置node环境
  • etcd 和 MongoDB 的混沌(故障注入)测试方法
  • 数据网络理论基础 第三章网络的时延模型
  • [力扣题解] 841. 钥匙和房间
  • 深入探讨 Java 8 集合操作:全面解析 Stream API 的强大功能
  • STM32_HAL__TIM_输出比较
  • C语言----判断n是否是2的次方数,利用到按位与,算法n(n-1)
  • 在linux中配置关于GFS创建各种卷以及卷组--配置实验
  • Anti Desgin Vue 实现 表格可编辑、新增、删除功能
  • 前端开发工程师——webpack
  • HOOK定义
  • 力扣爆刷第144天之二叉树四连刷(完结二叉搜索树改变树结构)
  • Babel配置的不完全指南
  • eclipse(luna)创建web工程
  • IndexedDB
  • Java IO学习笔记一
  • Javascript Math对象和Date对象常用方法详解
  • Mac 鼠须管 Rime 输入法 安装五笔输入法 教程
  • orm2 中文文档 3.1 模型属性
  • Spring-boot 启动时碰到的错误
  • vue的全局变量和全局拦截请求器
  • 初识MongoDB分片
  • 观察者模式实现非直接耦合
  • 后端_ThinkPHP5
  • 基于阿里云移动推送的移动应用推送模式最佳实践
  • 聊聊spring cloud的LoadBalancerAutoConfiguration
  • 马上搞懂 GeoJSON
  • 问:在指定的JSON数据中(最外层是数组)根据指定条件拿到匹配到的结果
  • 硬币翻转问题,区间操作
  • 用Visual Studio开发以太坊智能合约
  • 远离DoS攻击 Windows Server 2016发布DNS政策
  • 交换综合实验一
  • "无招胜有招"nbsp;史上最全的互…
  • ## 临床数据 两两比较 加显著性boxplot加显著性
  • #免费 苹果M系芯片Macbook电脑MacOS使用Bash脚本写入(读写)NTFS硬盘教程
  • $.ajax()
  • (3)nginx 配置(nginx.conf)
  • (Python) SOAP Web Service (HTTP POST)
  • (Redis使用系列) Springboot 使用redis实现接口幂等性拦截 十一
  • (笔试题)分解质因式
  • (二)七种元启发算法(DBO、LO、SWO、COA、LSO、KOA、GRO)求解无人机路径规划MATLAB
  • (附程序)AD采集中的10种经典软件滤波程序优缺点分析
  • (六)vue-router+UI组件库
  • (原)记一次CentOS7 磁盘空间大小异常的解决过程
  • . NET自动找可写目录
  • .mat 文件的加载与创建 矩阵变图像? ∈ Matlab 使用笔记
  • .Net 8.0 新的变化
  • .NET 跨平台图形库 SkiaSharp 基础应用
  • .NET 指南:抽象化实现的基类
  • .NET/C# 将一个命令行参数字符串转换为命令行参数数组 args
  • .NetCore实践篇:分布式监控Zipkin持久化之殇
  • .Net多线程总结