流分析 Stream Analytics-實時數據流式處理,可處理來自數百萬台 IoT 設備的數據


典型的物聯網架構中,有實時數據分析的需求,在Azure中,流分析(stream analytics)就是這樣的服務,它可以存在雲中或者部署到邊緣設備上。

 

流分析的基本概念:

https://v.qq.com/x/page/o3034qra1zz.html

 

實戰案例:

對物聯網傳感器數據進行實時分析,每30秒監視一次傳感器上報的平均溫度,高於100度時報警,報警的方式是將報警的傳感器及其溫度值直接寫入到Sql DB。

實戰內容:

https://v.qq.com/x/page/g3034ntojf8.html

 

 

雲中的實時流式處理

•為物聯網解決方案執行實時分析

•每秒對數百萬事件進行流式處理

•獲得關鍵任務的可靠性和性能預測結果

•利用設備和應用程序的數據,創建實時儀表板和警報

•跨多個數據流進行關聯

•使用常見的基於 SQL 的語言,以實現快速開發

 

比如我們要使用流分析完成如下的需求:

每30秒監視一次傳感器上報的平均溫度,高於100度時報警。通常情況下,我們將流分析部署於雲端,在IoT Hub之后,從IoT Hub接收設備數據進行在線分析。

 

在這個場景中,如果我們的傳感器有成千上萬,每個傳感器上傳的數據大部分又都是低於100°C的,這些數據實際上對我們來說,沒有意義,所以我們還會期待,在設備側直接進行分析,然后講分析過濾后的數據,上傳到IoT Hub,一來降低數據傳輸的成本,二來對數據進行過濾,保證數據質量,這也就是流分析的第二種部署方式:部署到邊緣

下面介紹流分析的幾個重要概念:

1.輸入:目前支持Azure EventHub事件中心,Azure IoT Hub, Azure Blob 存儲三個源。

 

輸入分兩種類型:

將數據推送到數據源后,流分析作業就可使用該數據並對其進行實時處理。 輸入分為兩種類型:流輸入(stream inputs)和引用輸入(referenceinputs)。

  • 流輸入是指:數據流是一段時間內不受限制的事件序列。 流分析作業必須至少包含一個數據流輸入。 事件中心、IoT 中心和 Blob 存儲均可作為數據流輸入源。

 

 

  • 引用輸入是指:引用數據是完全靜態的或更改緩慢。 它通常用於執行關聯和查找。 例如,可以將數據流輸入中的數據聯接到引用數據中的數據,就像執行SQL 聯接以查找靜態值一樣。 當前支持將Azure Blob 存儲和 Azure SQL 數據庫作為參考數據的輸入源。 

2.輸出:

SQL Database,Blob Storage, Event Hub, Table Storage, Service Bus Queues, Service Bus Topics,Cosmos DB等等。

具體請參見 :

https://docs.azure.cn/zh-cn/stream-analytics/stream-analytics-define-outputs

3. 查詢:

查詢是使用類似SQL的語法進行數據過濾和計算。支持4種窗口函數:

翻轉開窗函數用於將數據流划分成不同的時間段並對其執行某個函數,如以下示例所示。 翻轉窗口的主要差異在於它們會重復,不重疊,並且一個事件不能屬於多個翻轉窗口。

 

跳躍開窗函數在一段固定的時間內向前跳躍。 人們往往將此類窗口視為可以重疊的翻轉窗口,因此一個事件可以屬於多個跳躍窗口結果集。 要使跳躍窗口與翻轉窗口相同,需將躍點大小指定為與窗口大小相同。

不同於翻轉或跳躍窗口,滑動開窗函數只在事件發生時生成輸出。 每個窗口至少有一個事件,並且窗口持續根據€ (epsilon) 向前移動。 與跳躍窗口一樣,事件可以屬於多個滑動窗口。

會話窗口函數對差不多同時到達的事件進行分組,篩選出沒有數據的時間段。 它具有三個主要參數:超時、最長持續時間和分區鍵(可選)。

 

實戰內容:請參照本文開頭的視頻。

實戰中用到的修改后的代碼:

import random
import time
import sys
​
# Using the Python Device SDK for IoT Hub:
#   https://github.com/Azure/azure-iot-sdk-python
# The sample connects to a device-specific MQTT endpoint on your IoT Hub.
import iothub_client
# pylint: disable=E0611
from iothub_client import IoTHubClient, IoTHubClientError, IoTHubTransportProvider, IoTHubClientResult
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError, DeviceMethodReturnValue
​
# The device connection string to authenticate the device with your IoT hub.
# Using the Azure CLI:
# az iot hub device-identity show-connection-string --hub-name {YourIoTHubName} --device-id MyNodeDevice --output table
CONNECTION_STRING = "HostName=iothubforsatest.azure-devices.cn;DeviceId=test0002;SharedAccessKey=vnYkfQ4znJqVow9ZeBsooyj5kYeJs96etpcUoQI/FwQ="# Using the MQTT protocol.
PROTOCOL = IoTHubTransportProvider.MQTT
MESSAGE_TIMEOUT = 10000# Define the JSON message to send to IoT Hub.
TEMPERATURE = 0.0
HUMIDITY = 60
MSG_TXT = "{\"temperature\": %.2f,\"humidity\": %.2f,\"deviceid\": 'test0002'}"def send_confirmation_callback(message, result, user_context):
    print ( "IoT Hub responded to message with status: %s" % (result) )
​
def iothub_client_init():
    # Create an IoT Hub client
    client = IoTHubClient(CONNECTION_STRING, PROTOCOL)
    return client
​
def iothub_client_telemetry_sample_run():
​
    try:
        client = iothub_client_init()
        print ( "IoT Hub device sending periodic messages, press Ctrl-C to exit" )
​
        while True:
            # Build the message with simulated telemetry values.
            temperature = TEMPERATURE + (random.random() * 15)
            humidity = HUMIDITY + (random.random() * 20)
            msg_txt_formatted = MSG_TXT % (temperature, humidity)
            message = IoTHubMessage(msg_txt_formatted)
​
            # Add a custom application property to the message.
            # An IoT hub can filter on these properties without access to the message body.
            prop_map = message.properties()
            if temperature > 30:
              prop_map.add("temperatureAlert", "true")
            else:
              prop_map.add("temperatureAlert", "false")
​
            # Send the message.
            print( "Sending message: %s" % message.get_string() )
            client.send_event_async(message, send_confirmation_callback, None)
            time.sleep(3)
​
    except IoTHubError as iothub_error:
        print ( "Unexpected error %s from IoTHub" % iothub_error )
        return
    except KeyboardInterrupt:
        print ( "IoTHubClient sample stopped" )
​
if __name__ == '__main__':
    print ( "IoT Hub Quickstart #1 - Simulated device" )
    print ( "Press Ctrl-C to exit" )
    iothub_client_telemetry_sample_run()
 

 

示例的查詢:

select System.Timestamp AS OutPutTime, deviceid AS DeviceName,Avg(temperature) AS temp into windowoutput from inputiot TIMESTAMP BY EventProcessedUtcTime GROUP BY SlidingWindow(second,30), deviceid HAVING Avg(temperature)>30;

 

本實戰基本流程為:

1.創建 IoTHub和模擬設備

2.創建流分析服務

3.創建SqlDatabase(作為輸出)

3.配置流分析的輸入

4.配置流分析的輸出

5.設計Query

6.檢查實時分析結果

注意事項:

輸出至SQL DB時,表必須提前創建好;

表中必須涵蓋所有輸出字段。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM