Thingsboard MQTT模块功能分析

1、Thingsboard MQTT Transport Service

Thingsboard MQTT模块功能分析

org.thingsboard.server.mqtt.ThingsboardMqttTransportApplication,MQTT服务启动类,使用SpringBoot启动类,通过加载模块和配置文件,对服务进行配置并运行。

@SpringBootConfiguration
@EnableAsync
@EnableScheduling
@ComponentScan({"org.thingsboard.server.mqtt", "org.thingsboard.server.common", "org.thingsboard.server.transport.mqtt", "org.thingsboard.server.kafka"})
public class ThingsboardMqttTransportApplication {

    private static final String SPRING_CONFIG_NAME_KEY = "--spring.config.name";
    private static final String DEFAULT_SPRING_CONFIG_PARAM = SPRING_CONFIG_NAME_KEY + "=" + "tb-mqtt-transport";

    public static void main(String[] args) {
        SpringApplication.run(ThingsboardMqttTransportApplication.class, updateArguments(args));
    }

    private static String[] updateArguments(String[] args) {
        if (Arrays.stream(args).noneMatch(arg -> arg.startsWith(SPRING_CONFIG_NAME_KEY))) {
            String[] modifiedArgs = new String[args.length + 1];
            System.arraycopy(args, 0, modifiedArgs, 0, args.length);
            modifiedArgs[args.length] = DEFAULT_SPRING_CONFIG_PARAM;
            return modifiedArgs;
        }
        return args;
    }
}

@EnableAsync注解使用来开启异步线程,
@EnableScheduling注解使用来开启定时任务。
@ComponentScan({“org.thingsboard.server.mqtt”, “org.thingsboard.server.common”, “org.thingsboard.server.transport.mqtt”, “org.thingsboard.server.kafka”}): 扫描这些包下的所有使用@Component 的类,不管自动导入还是导出。

updateArguments的作用是:启动时,使用 –spring.config.name = tb-mqtt-transport, 指定配置名,包括但不仅限于tb-mqtt-transport.conf等文件。

2、MQTT Transport Common

Netty框架

Thingsboard的Mqtt协议逻辑实现是通过Netty实现的,Netty是一个NIO客户端、服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和简化了网络编程,例如TCP和UDP套接字服务器。

  • Netty 官网
  • Netty GitHub仓库
引入依赖

MQTT Transport common通过引入Netty 4.x版本的jar包对Mqtt进行协议逻辑实现,Netty4.x和3.x的区别还是挺大的。


    io.netty
    netty-all

参数配置

  mqtt:
    # Enable/disable mqtt transport protocol.
    enabled: "${MQTT_ENABLED:true}"
    bind_address: "${MQTT_BIND_ADDRESS:0.0.0.0}"
    bind_port: "${MQTT_BIND_PORT:1883}"
    timeout: "${MQTT_TIMEOUT:10000}"
    netty:
      leak_detector_level: "${NETTY_LEAK_DETECTOR_LVL:DISABLED}"
      boss_group_thread_count: "${NETTY_BOSS_GROUP_THREADS:1}"
      worker_group_thread_count: "${NETTY_WORKER_GROUP_THREADS:12}"
      max_payload_size: "${NETTY_MAX_PAYLOAD_SIZE:65536}"
    # MQTT SSL configuration
    ssl:
      # Enable/disable SSL support
      enabled: "${MQTT_SSL_ENABLED:false}"
      # SSL protocol: See http://docs.oracle.com/javase/8/docs/technotes/guides/security/StandardNames.html#SSLContext
      protocol: "${MQTT_SSL_PROTOCOL:TLSv1.2}"
      # Path to the key store that holds the SSL certificate
      key_store: "${MQTT_SSL_KEY_STORE:mqttserver.jks}"
      # Password used to access the key store
      key_store_password: "${MQTT_SSL_KEY_STORE_PASSWORD:server_ks_password}"
      # Password used to access the key
      key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
      # Type of the key store
      key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"

模块目录结构

└── main
    └── java
        └── org
            └── thingsboard
                └── server
                    └── transport
                        └── mqtt
                            ├── MqttSslHandlerProvider.java Mqtt Ssl逻辑处理提供类
                            ├── MqttTopics.java Mqtt预定义主题
                            ├── MqttTransportContext.java Mqtt传输协议上下文
                            ├── MqttTransportHandler.java Mqttt传输协议逻辑处理类
                            ├── MqttTransportServerInitializer.java Mqtt传输协议初始化类
                            ├── MqttTransportService.java   Mqtt传输协议启动类
                            ├── adaptors
                            │   ├── JsonMqttAdaptor.java Mqtt传输内容Json适配器
                            │   └── MqttTransportAdaptor.java Mqtt协议传输适配器
                            ├── session
                            │   ├── DeviceSessionCtx.java 设备会话上下文
                            │   ├── GatewayDeviceSessionCtx.java 网关设备会话上下文
                            │   ├── GatewaySessionHandler.java 网关会话处理类
                            │   ├── MqttDeviceAwareSessionContext.java Mqtt设备会话上下文
                            │   └── MqttTopicMatcher.java Mqtt主题匹配器
                            └── util
                                └── SslUtil.java Ssl工具类