MQTT Retained Message 为什么新订阅者会收到旧消息?
MQTT retained message 是 Broker 为某个 Topic 保存的“最后一条状态消息”。发布端设置 retain=true 后,Broker 会把这条消息存下来;后来才订阅该 Topic 的客户端,不用等设备再次上报,也会立刻收到这条保留消息。它适合表达“当前状态”,不适合表达“历史事件”。温度、开关状态、设备配置、系统版本这类值很适合;告警流水、订单事件、聊天消息就不适合,否则新订阅者会误以为旧事件刚刚发生。
它到底解决什么问题?
没有 retained message 时,新客户端订阅 sensor/123/temperature 后只能等待下一次发布。如果传感器 10 分钟才上报一次,页面刚打开时就会空白 10 分钟。设置保留消息后,Broker 会在订阅成功时先补发最新值,例如 {"value":25.5,"unit":"C","ts":1710000000}。注意每个 Topic 只保留一条,新的 retained 消息会覆盖旧的,不能把它当数据库用。它的价值在于让订阅方快速拿到“现在是什么”,而不是让订阅方知道“刚才发生过什么”。在设备管理后台、Home Assistant、车联网网关里,这个差异很关键:状态页需要立刻显示当前值,审计页才需要完整历史。
可以用 mosquitto 快速验证:
bashmosquitto_pub -h localhost -t sensor/123/temperature -m '{"value":25.5}' -r mosquitto_sub -h localhost -t sensor/123/temperature -v mosquitto_pub -h localhost -t sensor/123/temperature -n -r
第三条命令发布空 Payload 并带 retain,用来清除该 Topic 的保留消息。很多踩坑都出在这里:只发布空消息但忘了 -r,Broker 不会删除 retained 记录。另一个常见问题是以为订阅通配符时 Broker 只返回一条,实际上订阅 sensor/+/temperature 时,匹配到的每个具体 Topic 如果有 retained 记录,都可能各发一条。第一次接入大量设备时,这会让订阅端瞬间收到一批初始化消息,所以消费端要能区分初始化流量和实时流量。服务端也要限制 retained 消息大小,别把整份配置文件、证书或图片塞进去;大对象更适合放对象存储,MQTT 里只放版本号和下载地址。
追问
Retained message 和普通消息有什么区别?
普通消息只发给当时在线且已订阅的客户端,错过就错过了;retained message 会被 Broker 留一份,后来的订阅者也能拿到。这个机制的取舍是牺牲一部分 Broker 存储,换来状态初始化更快。边界也很明确:它只表示“最后状态”,不保证覆盖状态变化过程。如果业务需要追溯每次变化,应该写入时序库、日志系统或消息队列,而不是堆 retained Topic。还有一种折中做法是 retained 只保存最新摘要,历史明细由后端 API 查询,这样页面既能秒开,又不会让 Broker 背负数据库职责。
QoS 会怎样影响保留消息?
保留消息发布时可以带 QoS 0、1、2,Broker 保存的也是这条消息及其 QoS。订阅者实际收到的 QoS 通常是 min(发布 QoS, 订阅 QoS),所以发布 QoS 1 不代表每个订阅者都按 QoS 1 收到。一般状态展示用 QoS 0 就够,关键配置可以用 QoS 1。QoS 2 成本更高,会增加 Broker 和客户端的握手负担,只有在重复处理代价很大、链路又必须严格确认时才值得使用。
什么场景不应该使用 retained message?
不要把告警、支付回调、门锁开门记录这类事件做成 retained message。新客户端订阅后收到旧告警,很容易触发重复通知或误判现场还在故障。也不要在高频遥测上滥用,例如每秒上报几千个不同 Topic,Broker 的 retained 索引和持久化会被放大。更稳妥的做法是只保留少量状态 Topic,把高频数据写到专门的存储系统,再由应用层按时间范围查询。
代码里怎么发布和识别 retained message?
Python 的 paho-mqtt 里,发布时传 retain=True,订阅端可以通过 msg.retain 判断这是不是 Broker 补发的旧状态。这个标志很有用,页面初始化时可以直接渲染 retained 状态,但不要把它当作“刚刚发生”的事件。上线初始化逻辑和实时事件逻辑最好分开,否则统计、告警和审计都容易被旧消息污染。示例:
pythonclient.publish('sensor/123/temperature', '{"value":25.5}', qos=1, retain=True) def on_message(client, userdata, msg): if msg.retain: print('initial state:', msg.payload.decode()) else: print('live update:', msg.payload.decode())
使用 retained message 最容易踩什么坑?
第一是忘记清理,设备下线或被删除后,旧状态仍然留在 Broker,新页面看到的还是“在线”。第二是 Topic 设计太粗,比如所有设备共用 device/status,结果后发布的设备覆盖前一个设备,排查时看起来像设备互相串线。第三是 retained 和 Last Will 混用时没有规划好在线状态:上线消息应该主动发布 online 并 retain,异常掉线再由遗嘱发布 offline 并 retain。这样新订阅者看到的才是当前状态,而不是一段过期故事;如果设备退役,还要发布空 retained 消息把状态清掉。