Python集成EMQX

本文涉及的产品
模型训练 PAI-DLC,5000CU*H 3个月
交互式建模 PAI-DSW,5000CU*H 3个月
模型在线服务 PAI-EAS,A10/V100等 500元 1个月
简介: **EMQX**是一款实现了MQTT协议的高性能消息服务器软件,支持MQTT 3.1、3.1.1及5.0等协议,并兼容HTTP、QUIC和WebSocket等多种协议,确保广泛设备接入。作为全球下载量超千万的分布式物联网MQTT服务器,EMQX能支持单集群1亿设备连接,消息分发时延低于1毫秒,适用于构建关键业务的IoT平台与应用。EMQX具备SSL/TLS加密、双向认证、基于SQL的规则引擎等功能,并采用无主分布式架构确保高可用性和水平扩展性。提供开源版与企业版自托管方案及云服务选项,满足不同需求。通过Python SDK(如Paho-MQTT)可轻松集成,实现消息的发布与订阅。

[toc]

简介

EMQX是一款实现了MQTT协议的消息服务器软件

MQTT

MQTT 是物联网消息传输标准协议,其采用极其轻量级的发布订阅消息模型,以可扩展、可靠且高效的方式连接物联网设备。

EMQX

EMQX 是一款全球下载量超千万的大规模分布式物联网 MQTT 服务器,单集群支持 1 亿物联网设备连接,消息分发时延低于 1 毫秒。为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业构建关键业务的 IoT 平台与应用。

EMQX 支持多种协议,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保证各种网络环境和硬件设备的可访问性。EMQX 还提供了全面的 SSL/TLS 功能支持,比如双向认证以及多种身份验证机制,为物联网设备和应用程序提供可靠和高效的通信基础设施。

内置基于 SQL 的规则引擎,EMQX 可以实时提取、过滤、丰富和转换物联网数据。此外,EMQX 采用了无主分布式架构,以确保高可用性和水平扩展性,并提供操作友好的用户体验和出色的可观测性。

优势:

  • 超大规模
  • 高性能
  • 低延迟
  • 支持MQTT5.0协议
  • 高可用
  • 云原生
  • ......

EMQX版本

自托管

  • EMQX 开源版:支持 Apache Version 2.0 ,MQTT over QUIC,数据存储在内存中,支持 Webhook 和 MQTT 数据桥接,多协议网关支持(包括 MQTT-SN、STOMP 和 CoAP),有社区和论坛,商业许可证(商业源代码许可证)。

  • EMQX 企业版:基于 RocksDB 的会话持久化,与 40 多种企业系统双向数据集成(包括 Kafka/Confluent、Timescale、InfluxDB、PostgreSQL、Redis 等),审计日志和单点登录,基于角色的访问控制 (RBAC),文件传输,消息编解码,多协议网关支持(包括额外支持 OCPP、JT/808 和 GBT32960),5x8 技术支持。

云服务

  • EMQX Cloud Serverless:免费使用 Serverless,每月免费的使用额度,最高 1000 同时在线连接。
  • EMQX Cloud 专有版:14 天免费试用,极速部署,自动伸缩,按小时计费,主流云平台多区域部署,多种连接规格可选(连接无上限),VPC 对等连接、数据集成等更多功能,40+ 种完备的数据集成链路,7x24 技术支持。

私有部署

私有部署

EMQX发布/订阅消息模型

  • 点对点通信
  • 广播
  • 多对一:从大量端点整合数据,例如在智能家居场景中,可以从卧室、客厅、厨房各自的topic中通过共享订阅的方式订阅它们的温度数据
  • 通配符模型

EMQX主题及通配符

主题

MQTT 主题本质上是一个 UTF-8 编码的字符串,是 MQTT 协议进行消息路由的基础。MQTT 主题类似 URL 路径,使用斜杠 / 进行分层。分层之后可以使用共享订阅,一个订阅者可以同时聚合获取多个发布者发布到多个主题上面的消息。

为了避免歧义且易于理解,通常不建议主题以 / 开头或结尾,例如 /chat 或 chat/ 都是不推荐的。

系统主题:
以(\$SYS/)开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 系统主题 的标准。

通配符

MQTT 主题通配符包含单层通配符 + 及多层通配符 #,主要用于客户端一次订阅多个主题。

通配符只能用于订阅,不能用于发布

单层通配符

加号 (“+” U+002B) 是用于单个主题层级匹配的通配符。在使用单层通配符时,单层通配符必须占据整个层级,例如:


//有效
+ 

//有效
sensor/+ 

//有效
sensor/+/temperature 

//无效(没有占据整个层级)
sensor+

如果客户端订阅了主题 sensor/+/temperature,将会收到以下主题的消息:


sensor/1/temperature
sensor/2/temperature
...
sensor/n/temperature

但是不会匹配以下主题:


sensor/temperature
sensor/bedroom/1/temperature

多层通配符

井字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符,例如:


//有效,匹配所有主题
# 

//有效
sensor/# 

//无效(没有占据整个层级)
sensor/bedroom# 


//无效(不是主题最后一个字符)
sensor/#/temperature

如果客户端订阅主题 senser/#,它将会收到以下主题的消息:


sensor
sensor/temperature
sensor/1/temperature

主题使用建议

  • 不建议使用 # 订阅所有主题;
  • 不建议主题以 / 开头或结尾,例如 /chat 或 chat/;
  • 不建议在主题里添加空格及非 ASCII 特殊字符;
  • 同一主题层级内建议使用下划线 _ 或横杆 - 连接单词(或者使用驼峰命名);
  • 尽量使用较少的主题层级;
  • 当使用通配符时,将唯一值的主题层(例如设备号)越靠近第一层越好。例如,device/00000001/command/# 比device/command/00000001/# 更好。

python集成

假设emqx的broker(即server端)已经安装好了

安装paho-mqtt


pip3 install  paho-mqtt

推送消息

新建 push.py 文件,内容如下,每2s向 test/1/osd 这个topic 中推送一条消息,消息内容包含一个时间戳



import paho.mqtt.client as mqtt
import time
from datetime import datetime

# MQTT 连接设置
BROKER = "192.168.1.82"
PORT = 1883
TOPIC = "test/1/osd"
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

# 创建 MQTT 客户端实例
client = mqtt.Client()
client.on_connect = on_connect

# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)

# 启动网络循环
client.loop_start()
try:
    while True:
        # 发布消息到主题
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        message = f"Hello from Program A! Timestamp: {timestamp}"
        client.publish(TOPIC, message)
        print(f"Message '{message}' published to topic '{TOPIC}'")
        # 等待 10 秒
        time.sleep(2)
except KeyboardInterrupt:
    print("Exiting...")
finally:
    client.loop_stop()
    client.disconnect()

接收消息

新建 subscribe.py 文件,内容如下:



import paho.mqtt.client as mqtt

# MQTT 连接设置
BROKER = "192.168.1.82"
PORT = 1883
TOPIC = "test/1/osd"
def on_connect(client, userdata, flags, rc):
    print(f"Connected with result code {rc}")

    # 订阅主题
    client.subscribe(TOPIC)
    print(f"Subscribed to topic '{TOPIC}'")
def on_message(client, userdata, message):
    print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'")

# 创建 MQTT 客户端实例
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message

# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)

# 启动网络循环
client.loop_forever()

引用

相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
1月前
|
前端开发 JavaScript UED
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
通过在Django项目中集成Channels和WebSocket,我们能够为前后端分离的应用添加实时通信功能,实现诸如在线聊天、实时数据更新等交互式场景。这不仅增强了应用的功能性,也提升了用户体验。随着实时Web应用的日益普及,掌握Django Channels和WebSocket的集成将为开发者开启新的可能性,推动Web应用的发展迈向更高层次的实时性和交互性。
60 1
|
4月前
|
机器学习/深度学习 IDE 开发工具
Python集成开发环境的选择
【7月更文挑战第6天】Python集成开发环境的选择
66 2
|
28天前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
24天前
|
SQL 机器学习/深度学习 数据采集
SQL与Python集成:数据库操作无缝衔接2a.bijius.com
Python与SQL的集成是现代数据科学和工程实践的核心。通过有效的数据查询、管理与自动化,可以显著提升数据分析和决策过程的效率与准确性。随着技术的不断发展,这种集成的应用场景将更加广泛,为数据驱动的创新提供更强大的支持。
|
24天前
|
SQL 机器学习/深度学习 数据库
SQL与Python集成:数据库操作无缝衔接
1. Python与SQL集成的关键步骤 在开始之前,确保你已经安装了必要的Python库,如`sqlite3`(用于SQLite数据库)或`psycopg2`(用于PostgreSQL数据库)。这些库提供了Python与SQL数据库之间的接口。
|
30天前
|
SQL 云安全 监控
通过 Python 和 SQL 集成加强云环境
通过 Python 和 SQL 集成加强云环境
28 0
|
4月前
|
Serverless 语音技术 开发工具
函数计算操作报错合集之怎么何集成nls tts python sdk
在使用函数计算服务(如阿里云函数计算)时,用户可能会遇到多种错误场景。以下是一些常见的操作报错及其可能的原因和解决方法,包括但不限于:1. 函数部署失败、2. 函数执行超时、3. 资源不足错误、4. 权限与访问错误、5. 依赖问题、6. 网络配置错误、7. 触发器配置错误、8. 日志与监控问题。
|
4月前
|
前端开发 JavaScript API
探索Python Django中的WebSocket集成:为前后端分离应用添加实时通信功能
【7月更文挑战第17天】现代Web开发趋势中,前后端分离配合WebSocket满足实时通信需求。Django Channels扩展了Django,支持WebSocket连接和异步功能。通过安装Channels、配置设置、定义路由和消费者,能在Django中实现WebSocket交互。前端使用WebSocket API连接后端,实现双向数据流,如在线聊天功能。集成Channels提升Web应用的实时性和用户体验,适应实时交互场景的需求。**
179 6
|
4月前
|
机器学习/深度学习 IDE 开发工具
Python集成开发环境
【7月更文挑战第6天】Python集成开发环境
65 1
|
4月前
|
机器学习/深度学习 PyTorch TensorFlow
在深度学习中,数据增强是一种常用的技术,用于通过增加训练数据的多样性来提高模型的泛化能力。`albumentations`是一个强大的Python库,用于图像增强,支持多种图像变换操作,并且可以与深度学习框架(如PyTorch、TensorFlow等)无缝集成。
在深度学习中,数据增强是一种常用的技术,用于通过增加训练数据的多样性来提高模型的泛化能力。`albumentations`是一个强大的Python库,用于图像增强,支持多种图像变换操作,并且可以与深度学习框架(如PyTorch、TensorFlow等)无缝集成。