1. 主要实现功能
- 自动配置
- 消息自动解析
- 消息分组共享订阅
- 消息不分组共享订阅
- 消息排它订阅
- 延时发布
- 多数据源
4. 快速开始
4.1 引入依赖
dependency>
groupId>org.eclipse.pahogroupId>
artifactId>org.eclipse.paho.client.mqttv3artifactId>
version>1.2.5version>
dependency>
4.2 配置
spring:
mqtt:
emq:
client:
# 多数据源客户端名称,默认default
default:
# broker地址
host: 127.0.0.1
# 端口
port: 31883
# 用户名
username: admin
# 密码
password: 123456
更多配置如下
spring:
mqtt:
emq:
client:
# 多数据源客户端名称,默认default
default:
# broker地址
host: 127.0.0.1
# 端口
port: 31883
# 用户名
username: admin
# 密码
password: 123456
# 客户端标识,需保持全局唯一
client-id: parking_server
# 是否清除session
clean-session: false
# 连接超时时间,单位秒
connection-timeout: 10
# 心跳间隔时间,单位秒
keep-alive-interval: 10
# 全局消息质量
global-qos: 1
# 重新连接之间等待的最长时间
maxReconnect-delay: 128000
# 是否自动重新连接
automatic-reconnect: true
# 最大消息并发数量,超过此数量并发时可能会丢消息
maxInflight: 1000
4.3 开启自动配置
在启动类上增加@EnableRabbitMqAutoConfiguration
注解
import com.demo.mqttclient.anno.EnableEmqAutoConfiguration;
@SpringBootApplication
@EnableEmqAutoConfiguration
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
}
4.4 发布消息
在生产者的业务程序中,注入MQTTClient
import com.demo.mqttclient.MQTTClient;
@Resource
private MQTTClient defaultMQTTClient;
为了兼容第三方及优化内部使用逻辑,所以内置提供了两种消息发送方式。
4.4.1 第三方消息发送
public String publishHeartbeatReply() {
HeartbeatReplyMessage heartbeatReplyMessage = new HeartbeatReplyMessage();
heartbeatReplyMessage.setCmd(32896);
heartbeatReplyMessage.setExpire(1605252875L);
heartbeatReplyMessage.setDevid("095437323930030130523933");
heartbeatReplyMessage.setServer_time("1605252875");
defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage);
return "success";
}
4.4.2 内部消息发送
消息实体实现Message
接口
package com.example.test.message;
import com.demo.mqttclient.MQTTMessage;
import lombok.Data;
import java.util.UUID;
@Data
public class demoMessage implements MQTTMessage {
private String msgId = UUID.randomUUID().toString();
private String name;
private String gender;
@Override
public String getMsgId() {
return this.msgId;
}
}
然后直接调用该类的publish
方法发送即可
@GetMapping("demo/publish")
public String demoPublish() {
demoMessage demoMessage = new demoMessage();
demoMessage.setName("点都");
demoMessage.setGender("xx");
defaultMQTTClient.publish("demo/topic", demoMessage);
return "success";
}
其中存在多个重载的方法。
package com.demo.mqttclient;
import com.demo.mqttclient.enums.ShareModelEnum;
import com.demo.plugin.core.lang.json.JSONUtil;
import org.eclipse.paho.client.mqttv3.MqttMessage;
/**
* mqtt客户端
*
* @author zhangliuyang
* @date 2022/07/18
* @since 1.0.0
*/
public interface MQTTClient {
/**
* 启动客户端
*/
void start();
/**
* 关闭客户端
*/
void close();
/**
* 发布
*
* @param topic 主题
* @param message 消息
*/
default T extends MQTTMessage> void publish(String topic, T message) {
this.publish(topic, message, 1);
}
/**
* 发布
*
* @param topic 主题
* @param message 消息
* @param qos 消息质量
*/
default T extends MQTTMessage> void publish(String topic, T message, int qos) {
this.publish(topic, message, qos, 0);
}
/**
* 发布
*
* @param topic 主题
* @param message 消息
* @param qos 消息质量
* @param delay 延迟时间[unit:s, max:4294967s, condition: > 0]
*/
default T extends MQTTMessage> void publish(String topic, T message, int qos, long delay) {
this.publish(topic, message, qos, delay, false);
}
/**
* 发布
*
* @param topic 主题
* @param message 消息
* @param qos 消息质量
* @param delay 延迟时间[unit:s, max:4294967s, condition: > 0]
* @param retained 保留消息
*/
default T extends MQTTMessage> void publish(String topic, T message, int qos, long delay, boolean retained) {
MQTTMessageContext mqttMessageContext = new MQTTMessageContext();
mqttMessageContext.setId(message.getMsgId());
mqttMessageContext.setPayload(JSONUtil.write(message));
mqttMessageContext.setQos(qos);
mqttMessageContext.setDelay(delay);
mqttMessageContext.setRetained(retained);
mqttMessageContext.setTimestamp(System.currentTimeMillis());
this.publish(topic, mqttMessageContext);
}
/**
* 发布
*
* @param topic 主题
* @param messageContext 消息上下文
*/
void publish(String topic, MQTTMessageContext messageContext);
/**
* 发送到第三方
*
* @param topic 主题
* @param message 消息
*/
default void publish2ThirdParty(String topic, Object message) {
this.publish2ThirdParty(topic, 1, message);
}
/**
* 发送到第三方
*
* @param topic 主题
* @param qos 消息质量
* @param message 消息
*/
default void publish2ThirdParty(String topic, int qos, Object message) {
this.publish2ThirdParty(topic, qos, message, Constant.DEFAULT_CHARSET.name());
}
/**
* 发送到第三方
*
* @param topic 主题
* @param qos 消息质量
* @param message 消息
* @param charsetName 字符集名称
*/
default void publish2ThirdParty(String topic, int qos, Object message, String charsetName) {
this.publish2ThirdParty(topic, qos, message, charsetName, 0);
}
/**
* publish2第三方
*
* @param topic 主题
* @param qos qos
* @param message 消息
* @param charsetName 字符集名称
* @param delay 延迟时间
*/
default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay) {
this.publish2ThirdParty(topic, qos, message, charsetName, delay, false);
}
/**
* publish2第三方
*
* @param topic 主题
* @param qos qos
* @param message 消息
* @param charsetName 字符集名称
* @param delay 延迟时间
* @param retained 是否保留消息
*/
default void publish2ThirdParty(String topic, int qos, Object message, String charsetName, long delay, boolean retained) {
this.publish2ThirdParty(topic, qos, JSONUtil.toString(message).getBytes(charsetName), delay, retained);
}
/**
* 发送到第三方
*
* @param topic 主题
* @param payload 有效载荷
*/
default void publish2ThirdParty(String topic, byte[] payload) {
this.publish2ThirdParty(topic, 1, payload, 0);
}
/**
* 发送到第三方
*
* @param topic 主题
* @param qos 消息质量
* @param payload 有效载荷
*/
default void publish2ThirdParty(String topic, int qos, byte[] payload) {
this.publish2ThirdParty(topic, qos, payload, 0);
}
/**
* 发送到第三方
*
* @param topic 主题
* @param qos 消息质量
* @param payload 有效载荷
* @param delay 延迟时间
*/
default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay) {
this.publish2ThirdParty(topic, qos, payload, delay, false);
}
/**
* 发送到第三方
*
* @param topic 主题
* @param qos 消息质量
* @param payload 有效载荷
* @param delay 延迟时间
* @param retained 是否保留消息
*/
default void publish2ThirdParty(String topic, int qos, byte[] payload, long delay, boolean retained) {
MqttMessage mqttMessage = new MqttMessage();
mqttMessage.setPayload(payload);
mqttMessage.setQos(qos);
mqttMessage.setRetained(retained);
this.publish2ThirdParty(topic, delay, mqttMessage);
}
/**
* 发送到第三方
*
* @param topic 主题
* @param mqttMessage mqtt消息
* @param delay 延迟时间
*/
void publish2ThirdParty(String topic, long delay, MqttMessage mqttMessage);
/**
* 订阅
*
* @param topic 主题
* @param qos 消息质量
* @param shareModel 共享模型
* @param groupName 分组名称
* @param exclusive 排它
* @param messageHandler 消息处理程序
*/
void subscribe(String topic, int qos, ShareModelEnum shareModel, String groupName, boolean exclusive, MessageHandler messageHandler);
}
4.5 接收消息
消费者需要在消息处理类上添加@MQTTSubscriber(topics = {"npt/park/type1/server/10010"})
注解,指定要监听topics
和客户端名称即可。如果没有显示的指定客户端名称,则使用defaultMQTTClient
,使用qos
执行订阅消息质量。
当消息处理类中有多个public
方法时,需要@MQTTConsumerMethod
标记具体消费方法
package com.example.test.handler;
import com.demo.mqttclient.MessageHandler;
import com.demo.mqttclient.anno.MQTTConsumerMethod;
import com.demo.mqttclient.anno.MQTTSubscriber;
import com.demo.mqttclient.enums.ShareModelEnum;
import com.demo.plugin.core.lang.json.JSONUtil;
import com.example.test.message.DeviceStartMessage;
import com.example.test.message.PlateRecognitionReportMessage;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.util.Map;
@Slf4j
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE)
public class ParkingMessageHandler {
public static final String TOPIC = "npt/park/type1/server/10010";
public static final String CMD_KEY = "cmd";
@MQTTConsumerMethod
public void handle(MapString, Object> message) {
log.info("handle:{}", message);
if (message.containsKey(CMD_KEY)) {
Integer cmd = (Integer) message.get(CMD_KEY);
switch (cmd) {
case 129:
handleDeviceStartMessage(message);
break;
case 140:
handlePlateRecognitionReportMessage(message);
break;
default:
log.warn("不支持此cmd:[{}]", cmd);
break;
}
} else {
log.warn("消息消费异常");
}
}
private void handleDeviceStartMessage(MapString, Object> message) {
DeviceStartMessage deviceStartMessage = JSONUtil.toObject(JSONUtil.toString(message), DeviceStartMessage.class);
log.info("接收到设备启动消息:{}", deviceStartMessage);
}
private void handlePlateRecognitionReportMessage(MapString, Object> message) {
PlateRecognitionReportMessage plateRecognitionReportMessage = JSONUtil.toObject(JSONUtil.toString(message), PlateRecognitionReportMessage.class);
log.info("接收到车牌上报识别消息:{}", plateRecognitionReportMessage);
}
}
4.6 发送延迟消息
要发送延迟消息,需要先开启emq
延迟发布配置。
发送延时消息的方式相比之前,仅仅增加一个延时时间。其中延时时长的单位为秒
,最大为4294967
秒
//发送一个延时时长为10s的消息
defaultMQTTClient.publish2ThirdParty("npt/park/type1/dev/095437323930030130523933", 1, heartbeatReplyMessage, 10);
4.7 多数据源
多数据源与单数据源配置属性相同,在配置文件中声明即可
spring:
mqtt:
emq:
client:
# 多数据源客户端名称,默认default
default:
# broker地址
host: 127.0.0.1
# 端口
port: 31883
# 用户名
username: admin
# 密码
password: 123456
# 多数据源客户端名称
parking:
# broker地址
host: 127.0.0.1
# 端口
port: 31883
# 用户名
username: admin
# 密码
password: 123456
4.7.1 发布消息
首先注入MQTTClient
,与单数据源的唯一区别就是bean
的名称。默认向Spring
容器中添加的实现类名称为“${数据源名称}MQTTClient”
以上面的配置文件为例,默认的bean
名称为 defaultMQTTClient
和 billMQTTClient
import com.demo.mqttclient.MQTTClient;
@Resource
private MQTTClient defaultMQTTClient;
@Resource
private MQTTClient billMQTTClient;
其他操作同单数据源
4.7.2 接收消息
接收消息与单数据源基本一致,唯一的区别是在@MQTTSubscriber
中指定clientName
属性,指定当前从哪个数据源进行消费。
import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = {"npt/park/type1/server/10010"}, clientName = "parking")
public class ParkingMessageHandler {}
4.8 分组共享订阅
系统默认使用spring.application.name
作为分组名称,用户可在消息消费类上指定@MQTTSubscriber
属性中groupName = "group_name"
即可
import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.GROUP_SHARE, groupName = "group_name")
public class ParkingMessageHandler {}
4.9 不分组共享订阅
只需要在消息消费类上指定@MQTTSubscriber
属性中share = ShareModelEnum.UN_GROUP_SHARE
即可。
import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, share = ShareModelEnum.UN_GROUP_SHARE)
public class ParkingMessageHandler {}
4.10 排它订阅
只需要在消息消费类上指定@MQTTSubscriber
属性中exclusive = true
即可,开启排它订阅时,默认关闭共享订阅。
import com.demo.mqttclient.anno.MQTTSubscriber;
@MQTTSubscriber(topics = ParkingMessageHandler.TOPIC, qos = 1, exclusive = true)
public class ParkingMessageHandler {}
1.MQTT协议介绍 1.1 MQTT协议 MQTT(消息队列遥测传输) 是基于 TCP/IP 协议栈而构建的支持在各方之间异步通信的消息协议。MQTT在空间和时间上将消息发送者与接收者分离,因此可以在不可靠的网络环境中进行扩展。虽然叫做消息队列遥测传输,但它…