轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案

文章目录

  • 前言
  • 一、Onenet是什么?
  • 二、使用步骤
    • 1.总体流程
    • 2.添加产品
    • 3.调试API
      • 3.1.新增设备
        • python中调试
      • 3.2.注册设备
        • python中调试
      • 3.3.查询设备
        • python中调试
      • 3.4.新增数据流
        • python中调试
      • 3.5.上传数据点
        • python中调试
      • 3.6.查询数据流
        • python中调试
      • 3.7.发布消息
        • python中调试
      • 3.8.完整代码
    • 4.调试MQTT
      • 4.1.建立连接
      • 4.2.订阅消息
      • 4.3.上传数据点
      • 4.4.完整代码
  • 总结

前言

万物互联时代,DIY智能产品离不开物联网,本文意在记录快速接入流程,避免深陷官方文档中无法自拔。本文通过MQTT协议接入Onenet,用Python进行简单的调试,通过实验测试各项功能是否可靠。

一、Onenet是什么?

https://open.iot.10086.cn/console
OneNET是由中国移动打造的PaaS物联网开放平台。类似的平台有阿里云,机智云等,之所以选择Onenet,是因为使用起来十分简洁高效,开发文档清晰明了,提供了许多API接口方便调试。

二、使用步骤

1.总体流程

注册登录

添加产品

调试API

调试MQTT

为了一劳永逸,本文对于Onenet平台中控制台的操作仅需要添加产品,其他操作全部通过API完成,方便快速移植,实现全自动部署。

2.添加产品

注册登录后,进入控制台注册产品,

控制台

多协议接入

MQTT旧版

添加产品

轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案
添加产品后暂不添加设备
轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案
进入产品页面查看以下信息
轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案
轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案
产品ID:493938
Master-APIkey:MrXYtMYoqeWzt5d=WQaCMja=6F8=
access_key:FDnETJpnXnyMA6g2feVcFkFHrv6PxVvIkVrYBdfS8/w=
设备注册码:r6jc2WyJluQrV36F

3.调试API

API使用详情:https://open.iot.10086.cn/doc/v5/develop/detail/481
为提高API访问安全性,OneNET API的鉴权参数作为header参数存在,API的访问有两种安全机制,普通方式直接传输密钥,不够安全,token鉴权更为安全,但操作上需要复杂了一步,就是需要增加生成token的算法,官方也有提供,先测试普通方式。
轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案

3.1.新增设备

众所周知,HTTP请求方式有POST、GET、PUT等,请求最多三部分组成:URL、Header、Body,根据文档配置好三个部分的参数即可。

新增设备api-key使用产品级密钥Master-APIkey,Body使用json格式数据。
请求方式:POST

  • URL: http://api.heclouds.com/devices
  • Header:api-key = “MrXYtMYoqeWzt5d=WQaCMja=6F8=”
  • Body:{“title”: “test_device”}

python中调试

运行

#新增设备
import requests
import json
MasterAPIkey = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#产品核心密钥

URL = "http://api.heclouds.com/devices"
Header = {"api-key" : MasterAPIkey}
Body = {"title": "test_device"}

r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 0,
  "data": {
    "device_id": "906086916"
  },
  "error": "succ"
}

3.2.注册设备

注册设备不需要密钥,只需要注册码,Body使用json格式数据。
请求方式:POST

  • URL: http://api.heclouds.com/register_de?register_code=r6jc2WyJluQrV36F
  • Body:{“sn”: “123456789”}

python中调试

运行

#注册设备
import requests
import json
register_code = "r6jc2WyJluQrV36F"#注册码

URL = "http://api.heclouds.com/register_de?register_code="+register_code
Body = {"title":"python_device","sn": "123456789"}

r = requests.post(URL,json=Body).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 0,
  "data": {
    "device_id": "906087737",
    "key": "Wh83vAaZGFee8ktcFDbZOTVoh90="
  },
  "error": "succ"
}

3.3.查询设备

查询设备api-key可以使用产品级密钥Master-APIkey,也可以使用设备级密钥key,设备ID和设备级密钥可通过以上注册设备API获得,也可在设备管理页面查看。GET请求无需Body。
请求方式:GET

  • URL: http://api.heclouds.com/devices/906087737
  • Header:api-key = “Wh83vAaZGFee8ktcFDbZOTVoh90=”

python中调试

运行

#查询设备
import requests
import json

device_id = "906087737"#设备ID
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥

URL = "http://api.heclouds.com/devices/"+device_id
Header = {"api-key" : key}

r = requests.get(URL,headers=Header).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 0,
  "data": {
    "protocol": "MQTT",
    "private": true,
    "create_time": "2022-03-12 14:53:26",
    "keys": [
      {
        "title": "auto generated device key",
        "key": "Wh83vAaZGFee8ktcFDbZOTVoh90="
      }
    ],
    "online": false,
    "id": "906087737",
    "auth_info": "123456789",
    "title": "python_device"
  },
  "error": "succ"
}

3.4.新增数据流

新增数据流Body中必填参数为数据流ID。
请求方式:POST

  • URL: http://api.heclouds.com/devices/906087737/datastreams
  • Header:api-key = “Wh83vAaZGFee8ktcFDbZOTVoh90=”
  • Body:{“id”: “temp”}

python中调试

运行

#新增数据流
import requests
import json
device_id = "906087737"
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥

URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams"
Header = {"api-key" : key}

Body = {"id": "temp"}

r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 0,
  "data": {
    "ds_uuid": "d94a90f5-8bd0-5672-b967-14082115dd79"
  },
  "error": "succ"
}

3.5.上传数据点

上传数据点Body中按制定格式填写,可批量上传。
请求方式:POST

  • URL: http://api.heclouds.com/devices/906087737/datapoints
  • Header:api-key = “Wh83vAaZGFee8ktcFDbZOTVoh90=”
  • Body:{“datastreams”: [{“id”: “temp”,“datapoints”: [{“value”: 62} ]}]}

python中调试

运行

#上传数据点
import requests
import json
device_id = "906087737"
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥

URL = "http://api.heclouds.com/devices/"+device_id+"/datapoints"
Header = {"api-key" : key}
Body = {"datastreams": [{"id": "temp","datapoints": [{"value": 62}	]}]}

r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 0,
  "error": "succ"
}

3.6.查询数据流

查询数据流,URL对应相应的数据流ID。
请求方式:GET

  • URL: http://api.heclouds.com/devices/906087737/datastreams/temp
  • Header:api-key = “Wh83vAaZGFee8ktcFDbZOTVoh90=”

python中调试

运行

#查询数据流
import requests
import json

device_id = "906087737"
key = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#设备密钥
datastream_id = "temp"

URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams/"+datastream_id
Header = {"api-key" : key}

r = requests.get(URL,headers=Header).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 0,
  "data": {
    "update_at": "2022-03-12 15:21:08",
    "id": "temp",
    "create_time": "2022-03-12 15:11:14",
    "current_value": 62
  },
  "error": "succ"
}

3.7.发布消息

发布消息URL指定相应的Topic,api-key必须使用产品级密钥Master-APIkey。
请求方式:POST

  • URL: http://api.heclouds.com/mqtt?topic=python_topic
  • Header:api-key = “Wh83vAaZGFee8ktcFDbZOTVoh90=”
  • Body:“Hello mqtt!”

python中调试

运行

#发布消息
import requests
import json

MasterAPIkey = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#产品核心密钥

URL = "http://api.heclouds.com/mqtt?topic=python_topic"
Header = {"api-key" : MasterAPIkey}
Body = "Hello mqtt!"

r = requests.post(URL,data=Body,headers=Header).json()
print(json.dumps(r, indent=2))

返回

{
  "errno": 6,
  "error": "invalid parameter: topic is not exists"
}

返回中提示topic不存在,因为没有设备订阅过topic,通过4调试MQTT订阅该topic后便不会出错。

3.8.完整代码

运行

#新增设备
print("新增设备")
import requests
import json
MasterAPIkey = "MrXYtMYoqeWzt5d=WQaCMja=6F8="#产品核心密钥

URL = "http://api.heclouds.com/devices"
Header = {"api-key" : MasterAPIkey}
Body = {"title": "test_device"}

r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))

#注册设备
print("注册设备")
register_code = "r6jc2WyJluQrV36F"#注册码

URL = "http://api.heclouds.com/register_de?register_code="+register_code
Body = {"title":"python_device","sn": "123456789"}

r = requests.post(URL,json=Body).json()
print(json.dumps(r, indent=2))

#查询设备
print("查询设备")
device_id = r["data"]["device_id"]#设备ID
key = r["data"]["key"]#设备密钥

URL = "http://api.heclouds.com/devices/"+device_id
Header = {"api-key" : key}

r = requests.get(URL,headers=Header).json()
print(json.dumps(r, indent=2))

#新增数据流
print("新增数据流")
datastream_id = "temp"

URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams"
Header = {"api-key" : key}

Body = {"id": datastream_id}

r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))

#上传数据点
print("上传数据点")
URL = "http://api.heclouds.com/devices/"+device_id+"/datapoints"
Header = {"api-key" : key}
Body = {"datastreams": [{"id": datastream_id,"datapoints": [{"value": 62}	]}]}

r = requests.post(URL,json=Body,headers=Header).json()
print(json.dumps(r, indent=2))

#查询数据流
print("查询数据流")
URL = "http://api.heclouds.com/devices/"+device_id+"/datastreams/"+datastream_id
Header = {"api-key" : key}

r = requests.get(URL,headers=Header).json()
print(json.dumps(r, indent=2))

#发布消息
print("发布消息")
URL = "http://api.heclouds.com/mqtt?topic=python_topic"
Header = {"api-key" : MasterAPIkey}
Body = "Hello mqtt!"

r = requests.post(URL,data=Body,headers=Header).json()
print(json.dumps(r, indent=2))

返回

新增设备
{
  "errno": 0,
  "data": {
    "device_id": "906147806"
  },
  "error": "succ"
}
注册设备
{
  "errno": 0,
  "data": {
    "device_id": "906147809",
    "key": "R7YqtyhNtN2JkeeR0Axo=B6jnEY="
  },
  "error": "succ"
}
查询设备
{
  "errno": 0,
  "data": {
    "protocol": "MQTT",
    "private": true,
    "create_time": "2022-03-12 17:35:43",
    "keys": [
      {
        "title": "auto generated device key",
        "key": "R7YqtyhNtN2JkeeR0Axo=B6jnEY="
      }
    ],
    "online": false,
    "id": "906147809",
    "auth_info": "123456789",
    "title": "python_device"
  },
  "error": "succ"
}
新增数据流
{
  "errno": 0,
  "data": {
    "ds_uuid": "934076ef-adf9-53e3-9230-6159e3daedfc"
  },
  "error": "succ"
}
上传数据点
{
  "errno": 0,
  "error": "succ"
}
查询数据流
{
  "errno": 0,
  "data": {
    "update_at": "2022-03-12 17:35:43",
    "id": "temp",
    "create_time": "2022-03-12 17:35:43",
    "current_value": 62
  },
  "error": "succ"
}
发布消息
{
  "errno": 6,
  "error": "invalid parameter: no device subscribe to this topic"
}

4.调试MQTT

4.1.建立连接

连接Onenet服务器,mqtt用户名和密码对应产品ID和鉴权信息
运行

import paho.mqtt.client as mqtt
import random,json

HOST = "183.230.40.39"
PORT = 6002

Device_id = "906147809"#设备ID
Product_id = "493938"#产品ID Mqtt用户名
Authorization = "123456789"#鉴权信息 Mqtt密码

def on_connect(client, userdata, flags, rc):
    print("连接结果:" + mqtt.connack_string(rc))

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

client = mqtt.Client(Device_id)
client.username_pw_set(Product_id,Authorization)

client.on_connect = on_connect#连接回调
client.on_message = on_message#消息回调

client.connect(HOST, PORT, 120)
client.loop_forever()

返回

连接结果:Connection Accepted.

查看设备详情
轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案

4.2.订阅消息

订阅python_topic

client.subscribe("python_topic")

4.3.上传数据点

设备使⽤publish报⽂来上传数据点,需要往系统指定主题发送指定格式数据,这两点非常关键。见设备终端接入协议-MQTT V2.6

指定主题:$dp
指定格式数据:3字节报头 + 数据

3字节报头:

  • 字节1 数据点类型
  • 字节2 数据长度高8位
  • 字节3 数据长度低8位

0x01

0x02

0x03

0x04

0x05

0x06

0x07

字节1

JSON格式1

二进制数据

JSON格式2

JSON格式3

自定义分隔符

带时间自定义分隔符

可离散浮点数数据流

JSON格式2对应的数据格式较简单:{“datastream_id”:”value”}
将数据按json格式2打包:

# 转换为Json格式2字节数组
def payload_json2(json_data):
    data_bytes = json.dumps(json_data).encode('utf-8')
    byte1 = 0x03
    byte2 = len(data_bytes)>>8
    byte3 = len(data_bytes)%256
    payload = bytes([byte1,byte2,byte3]) + data_bytes
    return payload

上传数据点

# 上传数据点
value = random.randint(0,100)#生成随机整数
data = {"temp":value}
payload = payload_json2(data)
client.publish("$dp", payload, qos=0)

4.4.完整代码

连接成功

订阅python_topic

收到python_topic主题的消息

上传数据点

import paho.mqtt.client as mqtt
import random,json

HOST = "183.230.40.39"
PORT = 6002

Device_id = "906147809"#设备ID
Product_id = "493938"#产品ID Mqtt用户名
Authorization = "123456789"#鉴权信息 Mqtt密码

# 转换为Json格式2字节数组
def payload_json2(json_data):
    data_bytes = json.dumps(json_data).encode('utf-8')
    byte1 = 0x03
    byte2 = len(data_bytes)>>8
    byte3 = len(data_bytes)%256
    payload = bytes([byte1,byte2,byte3]) + data_bytes
    return payload

def on_connect(client, userdata, flags, rc):
    print("连接结果:" + mqtt.connack_string(rc))

    client.subscribe("python_topic")#订阅python_topic

def on_message(client, userdata, msg):
    print(msg.topic+" "+str(msg.payload))

    if msg.topic == "python_topic":
        # 上传数据点
        value = random.randint(0,100)
        data = {"temp":value}
        payload = payload_json2(data)
        client.publish("$dp", payload, qos=0)


client = mqtt.Client(Device_id)
client.username_pw_set(Product_id,Authorization)

client.on_connect = on_connect#连接回调
client.on_message = on_message#消息回调

client.connect(HOST, PORT, 120)
client.loop_forever()

总结

通过实验找到了一种简洁使用Onenet平台MQTT协议开发个人智能设备的解决方案

  1. 通过API接口注册设备获取设备ID和设备密钥
  2. 设备端通过MQTT协议连接物联网平台,通过MQTT协议上传数据点
  3. 设备端订阅指定Topic
  4. 上位机通过API接口发布消息,保证设备端能改实时接收
  5. 上位机通过API接口查询数据流获取数据点,实现非实时接收
  6. Onenet控制台可以通过设备管理页面查看所有历史数据点
本文章来源于互联网,如有侵权,请联系删除!原文地址:轻松使用中移物联网平台Onenet,MQTT协议快速接入实验,使用Onenet平台MQTT协议开发个人智能设备的解决方案

相关推荐: 【物联网】微信小程序接入阿里云物联网平台

微信小程序接入阿里云物联网平台 一 阿里云平台端 1.登录阿里云 阿里云物联网平台 点击进入公共实例,之前没有的点进去申请 2.点击产品,创建产品 3.产品名称自定义,按项目选择类型,节点类型选择之恋设备,联网方式WiFi,数据格式选择透传/自定义,其他默认 …