本文從概念到實戰,以一個假想產品——”電子貨架標簽“(Electronic Shelf Label,以下簡稱ESL)為例,介紹基於阿里雲IoT的物聯網應用開發。
數據交互流程
以雲端下發命令到最終收到應答為例(虛線表示異步):

LoRaWAN:ESL所采用的通訊協議;LoRaWAN NS:LoRaWAN網絡的中樞大腦,控制通訊參數、實現QoS、節點入網和遷移、數據加解密等。MQTT:基於Pub/Sub范式的消息協議。它工作在 TCP/IP協議族上,是為硬件性能低下的遠程設備以及網絡狀況糟糕的情況下而設計。Link WAN:阿里雲物聯網絡管理平台,可用它快速組建LoRaWAN網絡;簡單地說,它主要扮演了LoRaWAN NS的角色;AliIoT:阿里雲物聯網平台,基於MQTT。處理設備層和業務層的數據交互;AMQP:消息隊列,設備異步應答返回的消息通過此消息隊列傳遞到雲端。(廣義上說,AMQP是一個協議,RabbitMQ就是該協議的一個實現)
ESL和LoRa網關是通過LoRa協議通信,LoRa可以看做是物理層面的信息調制協議或通訊協議,沒有TCP的概念。
注意,MQTT並不局限於LoRaWAN場景,阿里雲也在平台上將二者作了不同入口,前者對應AliIoT,后者對應Link WAN。初次接觸不免困惑(這也是阿里雲一貫的作風),其實背后就是這個關系。我們可以設備直連AliIoT做IoT應用開發(參看10分鍾物聯網設備接入阿里雲IoT平台);如果是LoRaWAN系統,也可以同時借助 Link WAN 做LoRaWAN的網絡管理。
網關要接入Link WAN,需要移植阿里雲提供的SDK到網關與通信模組上,並且購買Link WAN密鑰安裝,並登錄阿里雲物聯網絡管理平台控制台添加網關。雲端開發人員只要關注AliIoT、AMQP及業務層即可。
AliIoT控制台准備
-
公共實例-》創建產品。產品名稱“電子貨架標簽”;節點類型表示該產品下設備的類型,選擇
直連設備(LoRa有IP的概念?),然后連網方式選擇LoRaWAN;因為ESL設備收發的數據為未編碼的字節數組,數據格式選擇透傳/自定義,后續需要提供數據解析腳本,將上行的自定義格式的數據轉換為Alink JSON格式,將下行的Alink JSON格式數據解析為設備自定義格式,設備才能與雲端進行通信。產品創建完畢獲得ProductKey。 -
管理產品-》功能定義,即定義所謂的
物模型。功能分為屬性、服務、事件三種類型(同定義一個類一樣,有屬性、方法、事件)。一個產品可以定義多個物模型,即一個產品下面可以有提供不同功能的多種設備。這里我們為ESL定義——- 屬性:shelfNo,所屬貨架,數據類型text。示例A.05.02,A區5排2號貨架;
- 服務:show,顯示貨品名稱和對應價格,入參有productName:text,price:float,調用方式選擇異步;
- 事件:heart,心跳,我們可以定義一些輸出參數如電池電量batteryLevel:int32,固件版本firmwareVersion:text,如此每次回報時這些信息也傳給雲端。
這樣,雲端就可以下發查詢電池電量和設置貨品名稱和對應價格的兩種命令,同時也可以被動接收設備返回的心跳消息。當然,物模型只是定義了接口,具體實現需要設備端和雲端共同完成。
物模型中服務調用方式可設置
同步或者異步。同步方式:物聯網平台直接使用RRPC同步方式下行推送請求,設備返回RRPC響應消息。RRPC使用詳情,請參見什么是RRPC。異步方式:物聯網平台采用異步方式下行推送請求,設備采用異步方式返回結果。 -
管理產品-》數據解析。上面說到,設備和雲端的交互數據需要中間的解析(序列化/反序列化)過程(發生在上圖第1步之后和第4步之前)。以JavaScript腳本為例:
var ALINK_EVENT_HEART_POST_METHOD = 'thing.event.heart.post'; //物聯網平台Topic,設備心跳包上報 var ALINK_EVENT_ACK_POST_METHOD = 'thing.event.ack.post'; //物聯網平台Topic,設備服務應答上報 var ALINK_PROP_REPORT_METHOD = 'thing.event.property.post'; //物聯網平台Topic,設備屬性上報 var ALINK_PROP_SET_METHOD = 'thing.service.property.set'; //物聯網平台Topic,雲端下發屬性控制指令到設備端。 var ALINK_PROP_SET_REPLY_METHOD = 'thing.service.property.set'; //物聯網平台Topic,設備上報屬性設置的結果到雲端。 var ALINK_SERVICE_SHOW_METHOD = 'thing.service.show'; //物聯網平台Topic,雲端調用設備show服務 /** * 將Alink協議的數據轉換為設備能識別的格式數據,物聯網平台給設備下發數據時調用 * 入參:jsonObj,對象,不能為空。 * 出參:rawData,byte[]數組,不能為空。 * * 示例數據: * 雲端下發屬性設置指令: * 傳入參數: * {"method":"thing.service.property.set","id":"12345","version":"1.0","params":{"shelfNo":"A.05.02"}} * 注意:雲端只下發{"shelfNo":"A.05.02"},其余結構是AliIoT封裝的。 */ function protocolToRawData(jsonObj) { var method = jsonObj['method']; var params = json['params']; //按照自定義協議格式拼接 rawData var rawdata = [0x5d, 0x64, 0x00]; if (method == ALINK_PROP_SET_METHOD) { //設置屬性 rawdata = rawdata.concat(textToByteArray(params['shelfNo'])); } else if (method == ALINK_SERVICE_SHOW_METHOD) { //調用服務 var productName = params['productName']; var price = params['price']; rawdata = rawdata.concat(textToByteArray(productName)); rawdata = rawdata.concat(floatToByteArray(price)); } //other commands ... return rawdata; } /** * 將設備的自定義格式數據轉換為Alink協議的數據,設備上報數據到物聯網平台時調用。 * 入參:rawData,byte[]數組,不能為空。 * 出參:jsonObj,對象,不能為空。 * * 示例數據: * 設備心跳上報: * 傳入參數: * 0xFF1020010005 * 輸出結果: * {"method":"thing.event.heart.post","id":"12345678","params":{"batteryLevel":32,"firmwareVersion":"1.0.5"},"version":"1.0"} */ function rawDataToProtocol(rawData) { var uint8Array = new Uint8Array(rawData.length); for (var i = 0; i < bytes.length; i++) { uint8Array[i] = bytes[i] & 0xff; } var dataView = new DataView(uint8Array.buffer, 0); var jsonObj = new Object(); var params = {}; var head = uint8Array.slice(0, 2).join(); //自定義協議包頭 if (head[0] == 0xFF && head[1] == 0x10) { params['batteryLevel'] = dataView.getInt8(2); params['firmwareVersion'] = `${dataView.getInt8(3)}.${dataView.getInt8(4)}.${dataView.getInt8(5)}`; jsonObj['method'] = ALINK_EVENT_HEART_POST_METHOD; } else { //其它數據包轉換 } jsonObj['version'] = '1.0'; //ALink JSON格式,協議版本號固定字段。 jsonObj['id'] = '12345678' //ALink JSON格式,標示該次請求id值。 jsonObj['params'] = params; return jsonObj; } /** * 處理自定義Topic,本示例不涉及 */ function transformPayload(topic, rawData) { var jsonObj = {} return jsonObj; }數據解析的前提之一是設備收發的數據格式要確定好。
上述腳本將業務數據和字節數組進行了轉換,若是擔心數據協議外泄[給阿里雲?],這部分工作也可以放在雲端,腳本文件只用來進行字節數組的轉發(這種情況下,物模型所有功能的出參入參都只需要一個,數據格式為int32array)。
-
管理產品-》服務端訂閱。創建AMQP訂閱,AMQP會將消息推送給列表中的所有消費組,一個消費組可看做是一個消息隊列,雲端作為客戶端連接某隊列得到設備上報消息。我們新建名稱為“電子貨架標簽-Q1”的消費組,得到一串自動生成的消費組ID。
雲端開發
以Java/Kotlin為例,先引入SDK:
//下發命令依賴
implementation("com.aliyun:aliyun-java-sdk-core:4.5.22")
implementation("com.aliyun:aliyun-java-sdk-iot:7.27.0")
//獲取應答依賴
implementation("org.apache.qpid:qpid-jms-client:0.59.0")
implementation("commons-codec:commons-codec:1.15")
下發show命令:
@Service
class AliIoTDemo {
@Autowired
lateinit var config: AliIoTConfig
private lateinit var client: IAcsClient
@PostConstruct
fun init() {
val profile =
DefaultProfile.getProfile(config.regionId, config.accessKeyId, config.accessKeySecret)
client = DefaultAcsClient(profile)
}
/**
* loraId: 設備編號,對應AliIoT的DeviceName
*/
fun show(loraId: String) {
val gson = GsonInstance.get()
val jo = JsonObject()
jo.addProperty("productName", "康師傅方便面")
jo.addProperty("price", 3.50)
val request = InvokeThingServiceRequest().apply {
productKey = config.productKey //創建物聯網產品時得到ProductKey
deviceName = loraId
identifier = "show" //物模型定義的服務名稱
args = gson.toJson(jo) //{"productName": "康師傅方便面", "price": 3.50}
}
client.doAction(request)
}
}
代碼中的client.doAction是無法得到應答的,所以我們還要寫一個AMQP客戶端去異步獲得應答消息,具體參看官方示例Java SDK接入示例 - 阿里雲物聯網平台。
多條異步命令順序執行
如果一個事務只要下發一條命令,那就等着拿結果就好了;但是有多條異步命令需要順序執行的話,就稍微有點麻煩了,我們要考慮上下文的掛起和恢復、超時取消等機制。以下為簡單示例:
//保存各事務對應的等待發送的命令隊列,命令一旦發送則須從隊列中移除
//key為設備編號,二元組第一項表示事務開始時間,用於超時判斷
private val cmdSetMap = ConcurrentHashMap<String, Pair<Long, Queue<InvokeThingServiceRequest>>>()
internal fun putInvokeThingServiceRequest(deviceNo: String, requests: Queue<InvokeThingServiceRequest>) {
//同樣設備之前的命令不再執行,移除
cmdSetMap.remove(deviceNo)
if (requests.size == 1) { //只有一條命令則直接發送
client.doAction(requests.poll())
} else {
val request = requests.poll() //先發送第一條
cmdSetMap[deviceNo] = Pair(System.currentTimeMillis(), requests) //其余的存入待發送列表
client.doAction(request)
}
}
//...
{
//應答消息抵達后,若應答OK則執行下一條命令
val request = cmdSetMap[deviceName]!!.second.poll()
try {
client.doAction(request)
} catch (ex: Exception) {
logger.error(ex)
// 發生錯誤 通知客戶端
}
if (cmdSetMap[deviceName]!!.second.size == 0) cmdSetMap.remove(deviceName)
}
//每分鍾清理過時事務
@Scheduled(cron = "0 * * * * *")
fun removeTimeoutCmd() {
//...
}
在語言層面,不管是以前的回調地獄還是后來興起的async/await、suspend、Promise等,都能處理這種場景。本質上,異步回調是指令尋址、變量出入棧的過程,有時還涉及到線程上下文的切換,各種語言/框架都幫我們考慮並且做了,我們只要按照既定語法編寫業務代碼即可。
為什么業務端不能直接訂閱對應的topic呢,這樣不就能直接拿到數據了嗎?AliIoT似乎也沒有提供業務層直接訂閱 AliIoT topic 的入口。不過MQTT協議是基於PUB/SUB的異步通信模式,就算業務端能直接接收到應答,也要處理應答消息轉發到對應的上下文、上下文掛起恢復等問題。
