1.什么是MQTT
MQTT(Message Queuing Telemetry Transport,消息队列遥测传输协议),是一种基于发布/订阅(publish/subscribe)模式的”轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。MQTT最大优点在于,可以以极少的代码和有限的带宽,为连接远程设备提供实时可靠的消息服务。作为一种低开销、低带宽占用的即时通讯协议,使其在物联网、小型设备、移动应用等方面有较广泛的应用
我们可以拿HTTP协议和MQTT协议做对比来更好地理解什么是MQTT. 按照OSI网络分层模型,IP是网络层协议,TCP是传输层协议,而HTTP和MQTT是应用层的协议.在这三者之间, IP/TCP是HTTP和MQTT底层的协议.即MQTT实际上是一个传输层协议.
2. MQTT的使用场景
2.1 http协议和websocket协议
要理解为什么要用mqtt,首先要说一下http协议的不足:
- HTTP客户端和服务器之间的交互是采用请求/应答模式(后来的HTTP1.1支持持久连接,半双工;HTTP2.0是全双工).且消息头内容较多
- HTTP通信方式问题,HTTP的请求/应答方式的会话都是客户端发起的,缺乏服务器通知客户端的机制,在需要通知的场景,如聊天室,游戏,客户端应用需要不断地轮询服务器
为了解决http的上述问题,websocket应运而生(websocket也是使用IP/TCP的传输层协议),websocket的特点:
- websocket建立连接时,数据是通过http传输的,建立连接后就不需要http协议了
- websocket建立连接后就是全双工模式,也是基于tcp协议
- 建立连接之后,不必在浏览器(客户端)发送request之后服务器才能发送信息到浏览器,这时候服务器有主动权,可以随时发消息给浏览器(客户端)
- 发送的信息中不必带有head部分信息了,相对于http来说,降低了服务器的压力,极大的减少了不必要的网络流量与延迟
http和websocket的区别与联系:
一. 联系
- 都是基于TCP协议
- websocket是基于http的他们的兼容性都很好
- 在连接的建立过程中对错误的处理方式相同
- 都使用 Request/Response模型进行连接的建立
- 都可以在网络中传输数据
二. 区别
- websocket是持久连接,http 是短连接(http可以通过Ajax一直发送请求和长轮询保持一段时间内的连接,但本质上还是短连接);
- websocket的协议是以 ws/wss 开头,http 对应的是 http/https;
- websocket是有状态的双向连接,http 是无状态的单向连接;
- websocket连接建立之后,数据的传输使用帧来传递,不再需要Request消息;
- websocket是可以跨域的。
websocket其实以及弥补了http很多的不足,那么为什么还会诞生MQTT呢?个人看来,websocket的诞生更多是为了解决http不能解决的问题,是http的互补,而MQTT更像是专门为了工业互联网的应用场景诞生的,下面我们学习下MQTT的特点,大家可以更好地体会一下.
2.2 MQTT协议
2.2.1 设计规范
根据物联网特殊的使用环境,MQTT遵循一下设计规范
- (1)精简,不添加可有可无的功能;
- (2)发布/订阅(Pub/Sub)模式,方便消息在传感器之间传递;
- (3)允许用户动态创建主题,零运维成本;
- (4)把传输量降到最低以提高传输效率;
- (5)把低带宽、高延迟、不稳定的网络等因素考虑在内;
- (6)支持连续的会话控制;
- (7)理解客户端计算能力可能很低;
- (8)提供服务质量管理;
- (9)假设数据不可知,不强求传输数据的类型与格式,保持灵活性。
2.2.2 主要特性
-
使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
这一点很类似于XMPP,但是MQTT的信息冗余远小于XMPP,,因为XMPP使用XML格式文本来传递数据。
-
对负载内容屏蔽的消息传输
-
使用TCP/IP提供网络连接。
主流的MQTT是基于TCP连接进行数据推送的,但是同样有基于UDP的版本,叫做MQTT-SN。这两种版本由于基于不同的连接方式,优缺点自然也就各有不同了
-
HTTP相比,MQTT协议确保了高传输保证。有3个级别的服务质量(QoS):
– 最多一次:保证尽力交付。 当 QoS 为 0 时,消息的分发依赖于底层网络的能力。发布者只会发布一次消息,接收者不会应答消息,发布者也不会储存和重发消息。消息在这个等级下具有最高的传输效率,但可能送达一次也可能根本没送达。
– 至少一次:保证消息至少传送一次。但是消息也可以不止一次传递。当 QoS 为 1 时,可以保证消息至少送达一次。MQTT 通过简单的 ACK 机制来保证 QoS 1。
- 发送者:发布消息,并等待接收者的 PUBACK 报文的应答,在规定的时间内没有收到 PUBACK 的应答,发布者会将消息的 DUP 置为1 并重发消息。
- 接受者:接收到 QoS 为 1 的消息时应该回应 PUBACK 报文,可能因为网络延迟等原因没有及时发出,这时接收者可能会多次接受同一个消息,无论 DUP标志如何,接收者都会将收到的消息当作一个新的消息并发送 PUBACK 报文应答。
核心
:就是发送消息的时候,接受者需要确认一次,规定时间内没有确认就会重新发。如果使用这种方式,写业务的时候需要保证幂等性
。– 恰好一次:保证每个消息只被对方接收一次.当 QoS 为 2 时,发布者和订阅者通过两次会话来保证消息只被传递一次,这是最高等级的服务质量,消息丢失和重复都是不可接受的。使用这个服务质量等级会有额外的开销。
- 发送者:发布 QoS 为 2 的消息之后,消息储存起来并等待接收者回复 PUBREC 的消息。
- 接受者:收到一条 QoS 为 2 的消息时,他会处理此消息并返回一条 PUBREC 进行应答。
- 发送者:收到 PUBREC 消息后,丢弃掉之前的发布消息。保存 PUBREC 消息,并应答一个 PUBREL。等待接收者回复 PUBCOMP 消息
- 接受者:当接收者收到 PUBREL 消息之后,它会丢弃掉所有已保存的状态,并回复 PUBCOMP。
- 发送者:当发送者收到 PUBCOMP 消息之后会清空之前所保存的状态。
核心
:发送消息的时候,接受者需要确认两次,来保证消息确实已经送到。无论在传输过程中何时出现丢包,发送端都负责重发上一条消息。不管发送端是 Publisher(发送端) 还是 Broker(服务器),都是如此。因此,接收端也需要对每一条命令消息都进行应答。
-
MQTT还为用户提供Last will&Testament和Retained消息的选项。第一个意味着在客户端意外断开连接的情况下,所有订阅的客户端都将从代理获得消息。保留消息意味着新订阅的客户端将立即获得状态更新
-
低协议开销,MQTT 的独特之处在于,它的每消息标题可以短至 2 个字节。MQ 和 HTTP 都拥有高得多的每消息开销。对于 HTTP,为每个新请求消息重新建立 HTTP 连接会导致重大的开销。MQ 和 MQTT 所使用的永久连接显著减少了这一开销。
-
对不稳定网络的容忍,MQTT 和 MQ 能够从断开等故障中恢复,而且没有进一步的代码需求。但是,HTTP 无法原生地实现此目的,需要客户端重试编码,这可能增加幂等性问题
-
保活心跳(Keep Alive), MQTT 客户端向服务器发起 CONNECT 请求时,通过 Keep Alive 参数设置保活周期。 客户端在无报文发送时,按 Keep Alive 周期定时发送 2 字节的 PINGREQ 心跳报文,服务端收到 PINGREQ 报文后,回复 2 字节的 PINGRESP 报文。 服务端在 1.5 个心跳周期内,既没有收到客户端发布订阅报文,也没有收到 PINGREQ 心跳报文时,将断开客户端连接。
-
保留消息(Retained Message), MQTT 客户端向服务器发布(PUBLISH)消息时,可以设置保留消息(Retained Message)标志。保留消息会驻留在消息服务器,后来的订阅者订阅主题时可以接收到最新一条
(注意,是只有最近的一条)
保留消息。 -
遗嘱消息(Will Message)
-
低功耗,MQTT 是专门针对低功耗目标而设计的。HTTP 的设计没有考虑此因素,因此增加了功耗
-
MQTT 客户端都已在大量平台上实现。(http同样适用大部分平台)
这些特性很多都是http所不具备的,MQTT 可以很好地解决物联网环境 1.网络代价昂贵,带宽低、不可靠;2. 在嵌入设备中运行,处理器和内存资源有限等问题, 根据3G网络的测量结果,MQTT的吞吐量比HTTP快93倍。 因此,在物联网环境中mqtt有着无可比拟的优势
2.2.3 总结
3.使用说明
3.1docker部署EMQX
EMQX则是实现mqtt消息代理分发的一个消息中间件。 部署很简单,分两步走:
-
获取 Docker 镜像
docker pull emqx/emqx:5.0.8
-
启动 Docker 容器
docker run -d --name emqx -p 1883:1883 -p 8083:8083 -p 8084:8084 -p 8883:8883 -p 18083:18083 emqx/emqx:5.0.8
各个服务端口说明:各个服务端口说明:
- 1883:MQTT 协议端口
- 8883:MQTT/SSL 端口
- 8083:MQTT/WebSocket 端口
- 8080:HTTP API 端口
- 8084 WSS端口
- 18083:Dashboard 管理控制台端口
EMQX 提供了 Dashboard 以方便用户管理设备与监控相关指标。控制台地址为:http://localhost:18083,默认用户名密码为:admin/public,可以在 etc/plugins/emqx_dashboard.conf 配置文件中修改默认密码。(国人开发的软件用户体验是真好)
3.2 SpringBoot整合mqtt&emqx
3.2.1 创建SpringBoot项目,添加pom引入jar包
org.springframework.integration
spring-integration-stream
org.springframework.integration
spring-integration-mqtt
org.springframework.boot
spring-boot-starter-integration
3.2.2 配置application.yml文件的Emqx参数
mqtt:
#MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
host: tcp://127.0.0.1:11883
#MQTT-连接服务器默认客户端ID
clientid: mqtt_id
#MQTT-用户名
username: admin
#MQTT-密码
password: admin
#MQTT-默认的消息推送主题,实际可在调用接口时指定
topic: test
#连接超时
timeout: 1000
#设置会话心跳时间
keepalive: 100
3.2.3 读取配置参数
package com.jscoe.mqtt;
import lombok.Data;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
@Component
@Configuration
@Data
@ConfigurationProperties("mqtt")
public class MqttConfiguration {
@Autowired
private MqttCustomerClient mqttCustomerClient;
private String host;
private String clientid;
private String username;
private String password;
private String topic;
private int timeout;
private int keepalive;
@Bean
public MqttCustomerClient getMqttCustomerClient() {
mqttCustomerClient.connect(host, clientid, username, password, timeout,keepalive);
// 以/#结尾表示订阅所有以test开头的主题
mqttCustomerClient.subscribe("test/#");
return mqttCustomerClient;
}
}
3.2.4 为Mqtt客户端实例提供回调函数
package com.jscoe.mqtt;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.springframework.stereotype.Component;
/**
* @author rongyu
* @description 消费监听
* @date 2022/9/29
*/
@Component
public class PushCallback implements MqttCallback {
private static MqttClient mqttClient;
/**
* 在断开连接时调用
* @param throwable
*/
@Override
public void connectionLost(Throwable throwable) {
if (mqttClient == null || !mqttClient.isConnected()) {
System.out.println("连接断开,正在重连....");
}
}
/**
* 消息到达后,调用
* @param topic
* @param message
* @throws Exception
*/
@Override
public void messageArrived(String topic, MqttMessage message) throws Exception {
System.out.println("接收消息主题 : " + topic);
System.out.println("接收消息Qos : " + message.getQos());
System.out.println("接收消息内容 : " + new String(message.getPayload()));
}
/**
* 消息发送成功后,调用
* @param token
*/
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------" + token.isComplete());
}
}
3.2.5 封装工具类
package com.jscoe.mqtt;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author rongyu
* @description 消费监听
* @date 2022/9/29
*/
@Slf4j
@Component
public class MqttCustomerClient {
@Autowired
private PushCallback pushCallback;
private static MqttClient client;
public static MqttClient getClient(){
return client;
}
public static void setClient(MqttClient client){
MqttCustomerClient.client=client;
}
/**
* 客户端连接
*
* @param host ip+端口
* @param clientID 客户端Id
* @param username 用户名
* @param password 密码
* @param timeout 超时时间
* @param keeplive 保留数
*/
public void connect(String host,String clientID,String username,String password,int timeout,int keeplive){
MqttClient client;
try {
client=new MqttClient(host,clientID,new MemoryPersistence());
MqttConnectOptions options=new MqttConnectOptions();
options.setCleanSession(true);
options.setUserName(username);
options.setPassword(password.toCharArray());
options.setConnectionTimeout(timeout);
options.setKeepAliveInterval(keeplive);
MqttCustomerClient.setClient(client);
try {
client.setCallback(pushCallback);
client.connect(options);
}catch (Exception e){
e.printStackTrace();
}
}catch (Exception e){
e.printStackTrace();
}
}
/**
* 发布,默认qos为0,非持久化
* @param topic
* @param pushMessage
*/
public void pushlish(String topic,String pushMessage){
pushlish(0,false,topic,pushMessage);
}
/**
* 发布
*
* @param qos 连接方式
* @param retained 是否保留
* @param topic 主题
* @param pushMessage 消息体
*/
public void pushlish(int qos,boolean retained,String topic,String pushMessage){
MqttMessage message=new MqttMessage();
message.setQos(qos);
message.setRetained(retained);
message.setPayload(pushMessage.getBytes());
MqttTopic mqttTopic= MqttCustomerClient.getClient().getTopic(topic);
if(null== mqttTopic){
log.error("topic not exist");
}
MqttDeliveryToken token;
try {
token=mqttTopic.publish(message);
token.waitForCompletion();
}catch (MqttPersistenceException e){
e.printStackTrace();
}catch (MqttException e){
e.printStackTrace();
}
}
/**
* 订阅某个主题,qos默认为0
* @param topic
*/
public void subscribe(String topic){
log.error("开始订阅主题" + topic);
subscribe(topic,0);
}
public void subscribe(String topic,int qos){
try {
MqttCustomerClient.getClient().subscribe(topic,qos);
}catch (MqttException e){
e.printStackTrace();
}
}
}
3.2.6 测试
package com.jscoe.core.modules.mqtt;
import com.jscoe.mqtt.MqttCustomerClient;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class MqttTest {
@Autowired
private MqttCustomerClient mqttCustomerClient;
@Test
public void pushlish() {
mqttCustomerClient.pushlish("test/device1", "hello mqtt............");
// for (int i = 0; i
// mqttCustomerClient.pushlish("test/device1", "hello mqtt............" + i);
// try {
// Thread.sleep(3000);
// } catch (InterruptedException e) {
// e.printStackTrace();
// }
// }
}
}
结果
[2022-09-29 17:14:32.572] [ithere-api] [local] [trade-id] ERROR 38980 --- [ main] com.jscoe.mqtt.MqttCustomerClient : 开始订阅主题test/#
deliveryComplete---------true
接收消息主题 : test/device1
接收消息Qos : 0
接收消息内容 : hello mqtt............
同时emqx的图形化界面也会同步显示内容,图片展示不方便,就不放了
参考文献
菜鸟教程-MQTT 入门介绍:https://www.runoob.com/w3cnote/mqtt-intro.html
CSDN博主「向上的小强」的原创文章 原文链接:https://blog.csdn.net/weixin_43647723/article/details/116605960
课程背景 物联网应用开发,并不像 Web 开发那样有固定的模式和框架可以学习,开发者往往还是需要从协议这一层慢慢往上搭积木,学习曲线比较陡。本课程结合物联网应用开发常用的设计模式以及作者多年的开发经验,带你从 0 开始搭建一个物联网平台,希望本课程所体现的架构…