[toc]
简介
EMQX是一款实现了MQTT协议的消息服务器软件。
MQTT
MQTT 是物联网消息传输标准协议,其采用极其轻量级的发布订阅消息模型,以可扩展、可靠且高效的方式连接物联网设备。
EMQX
EMQX 是一款全球下载量超千万的大规模分布式物联网 MQTT 服务器,单集群支持 1 亿物联网设备连接,消息分发时延低于 1 毫秒。为高可靠、高性能的物联网实时数据移动、处理和集成提供动力,助力企业构建关键业务的 IoT 平台与应用。
EMQX 支持多种协议,包括 MQTT (3.1、3.1.1 和 5.0)、HTTP、QUIC 和 WebSocket 等,保证各种网络环境和硬件设备的可访问性。EMQX 还提供了全面的 SSL/TLS 功能支持,比如双向认证以及多种身份验证机制,为物联网设备和应用程序提供可靠和高效的通信基础设施。
内置基于 SQL 的规则引擎,EMQX 可以实时提取、过滤、丰富和转换物联网数据。此外,EMQX 采用了无主分布式架构,以确保高可用性和水平扩展性,并提供操作友好的用户体验和出色的可观测性。
优势:
- 超大规模
- 高性能
- 低延迟
- 支持MQTT5.0协议
- 高可用
- 云原生
- ......
EMQX版本
自托管
EMQX 开源版:支持 Apache Version 2.0 ,MQTT over QUIC,数据存储在内存中,支持 Webhook 和 MQTT 数据桥接,多协议网关支持(包括 MQTT-SN、STOMP 和 CoAP),有社区和论坛,商业许可证(商业源代码许可证)。
EMQX 企业版:基于 RocksDB 的会话持久化,与 40 多种企业系统双向数据集成(包括 Kafka/Confluent、Timescale、InfluxDB、PostgreSQL、Redis 等),审计日志和单点登录,基于角色的访问控制 (RBAC),文件传输,消息编解码,多协议网关支持(包括额外支持 OCPP、JT/808 和 GBT32960),5x8 技术支持。
云服务
- EMQX Cloud Serverless:免费使用 Serverless,每月免费的使用额度,最高 1000 同时在线连接。
- EMQX Cloud 专有版:14 天免费试用,极速部署,自动伸缩,按小时计费,主流云平台多区域部署,多种连接规格可选(连接无上限),VPC 对等连接、数据集成等更多功能,40+ 种完备的数据集成链路,7x24 技术支持。
私有部署
私有部署
EMQX发布/订阅消息模型
- 点对点通信
- 广播
- 多对一:从大量端点整合数据,例如在智能家居场景中,可以从卧室、客厅、厨房各自的topic中通过共享订阅的方式订阅它们的温度数据
- 通配符模型
EMQX主题及通配符
主题
MQTT 主题本质上是一个 UTF-8 编码的字符串,是 MQTT 协议进行消息路由的基础。MQTT 主题类似 URL 路径,使用斜杠 / 进行分层。分层之后可以使用共享订阅,一个订阅者可以同时聚合获取多个发布者发布到多个主题上面的消息。
为了避免歧义且易于理解,通常不建议主题以 / 开头或结尾,例如 /chat 或 chat/ 都是不推荐的。
系统主题:
以(\$SYS/)开头的主题为系统主题,系统主题主要用于获取 MQTT 服务器自身运行状态、消息统计、客户端上下线事件等数据。目前,MQTT 协议暂未明确规定 系统主题 的标准。
通配符
MQTT 主题通配符包含单层通配符 + 及多层通配符 #,主要用于客户端一次订阅多个主题。
通配符只能用于订阅,不能用于发布
单层通配符
加号 (“+” U+002B) 是用于单个主题层级匹配的通配符。在使用单层通配符时,单层通配符必须占据整个层级,例如:
//有效
+
//有效
sensor/+
//有效
sensor/+/temperature
//无效(没有占据整个层级)
sensor+
如果客户端订阅了主题 sensor/+/temperature,将会收到以下主题的消息:
sensor/1/temperature
sensor/2/temperature
...
sensor/n/temperature
但是不会匹配以下主题:
sensor/temperature
sensor/bedroom/1/temperature
多层通配符
井字符号(“#” U+0023)是用于匹配主题中任意层级的通配符。多层通配符表示它的父级和任意数量的子层级,在使用多层通配符时,它必须占据整个层级并且必须是主题的最后一个字符,例如:
//有效,匹配所有主题
#
//有效
sensor/#
//无效(没有占据整个层级)
sensor/bedroom#
//无效(不是主题最后一个字符)
sensor/#/temperature
如果客户端订阅主题 senser/#,它将会收到以下主题的消息:
sensor
sensor/temperature
sensor/1/temperature
主题使用建议
- 不建议使用 # 订阅所有主题;
- 不建议主题以 / 开头或结尾,例如 /chat 或 chat/;
- 不建议在主题里添加空格及非 ASCII 特殊字符;
- 同一主题层级内建议使用下划线 _ 或横杆 - 连接单词(或者使用驼峰命名);
- 尽量使用较少的主题层级;
- 当使用通配符时,将唯一值的主题层(例如设备号)越靠近第一层越好。例如,device/00000001/command/# 比device/command/00000001/# 更好。
python集成
假设emqx的broker(即server端)已经安装好了
安装paho-mqtt
pip3 install paho-mqtt
推送消息
新建 push.py 文件,内容如下,每2s向 test/1/osd 这个topic 中推送一条消息,消息内容包含一个时间戳
import paho.mqtt.client as mqtt
import time
from datetime import datetime
# MQTT 连接设置
BROKER = "192.168.1.82"
PORT = 1883
TOPIC = "test/1/osd"
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# 创建 MQTT 客户端实例
client = mqtt.Client()
client.on_connect = on_connect
# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)
# 启动网络循环
client.loop_start()
try:
while True:
# 发布消息到主题
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
message = f"Hello from Program A! Timestamp: {timestamp}"
client.publish(TOPIC, message)
print(f"Message '{message}' published to topic '{TOPIC}'")
# 等待 10 秒
time.sleep(2)
except KeyboardInterrupt:
print("Exiting...")
finally:
client.loop_stop()
client.disconnect()
接收消息
新建 subscribe.py 文件,内容如下:
import paho.mqtt.client as mqtt
# MQTT 连接设置
BROKER = "192.168.1.82"
PORT = 1883
TOPIC = "test/1/osd"
def on_connect(client, userdata, flags, rc):
print(f"Connected with result code {rc}")
# 订阅主题
client.subscribe(TOPIC)
print(f"Subscribed to topic '{TOPIC}'")
def on_message(client, userdata, message):
print(f"Received message '{message.payload.decode()}' on topic '{message.topic}'")
# 创建 MQTT 客户端实例
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
# 连接到 MQTT 代理
client.connect(BROKER, PORT, 60)
# 启动网络循环
client.loop_forever()