opcua協議介紹


opc ua

是一種應用層協議,基於tcp之上,其url通常為opc.tcp://127.0.0.1:4840/abc,在opc ua中常被稱為endpoint

兩種模式

opc ua支持c/s模式,同時也支持類似mqtt的發布訂閱模式,通常各種設備作為opc ua的服務端提供各種服務。

k1yyi693ha

信息模型

opc ua采用面向對象的設計思路, 使用了對象(objects)作為過程系統表示數據和活動的基礎。對象包含了變量事件方法,它們通過引用(reference)來互相連接。

OPC UA 信息模型是節點的網絡(Network of Node,),或者稱為結構化圖(graph),由節點(node)和引用(References)組成,這種結構圖稱之為OPC UA 的地址空間。這種圖形結構可以描述各種各樣的結構化信息(對象)。

20190403111143907

注意⚠️:opc ua中所說的節點是在一個opc ua服務器中,不要理解為一個服務器對應一個node

節點

opc ua定義了8種類型的節點

對象(Object)
對象類型(Object Type)
變量(Variable)
變量類型(Variable Type)
方法(Method)
視圖(View)
引用(Reference)
數據類型(Data Type)

每種節點都包含一些公共屬性,如下:

屬性 數據類型 說明
NodeId NodeId 在OPC UA服務器內唯一確定的一個節點,並且在OPC UA服務器中定位該節點
NodeClass Int32 該節點的類型(上面列出的8種之一)
BrowseName QualifiedName 瀏覽OPC UA服務器事定義的節點。它是非本地化的
DisplayName LocalizedText 包含節點的名字,用來在用戶接口中顯示名字,本地化
Description LocalizedText 本地化的描述(可選)
WriteMask Uint32 指示哪個節點屬性是可寫的,即可被OPC UA客戶端修改(可選)
UserWriteMask Uint32 指示哪個節點屬性可以被當前連接到服務器上的用戶修改(可選)

除了數據類型節點之外,其他各個節點都有額外的專屬屬性

20190403111321142

引用

引用描述了兩個節點之間的關系,用來連接多個節點。OPC UA預定義了多種引用,常見的引用有:

  • hasTypeDefinition

描述對象、變量和類型之間的關系

  • ObjectNode的hasTypeDefinition引用,指向了一個ObjectTypeNode,表示該ObjectNode的類型;
  • VariableNode的hasTypeDefinition引用,指向一個VariableTypeNode,表示該 VariableNode的類型。
  • hasSubType

描述對象的擠成關系,當子類從父類繼承后,子類擁有一個hasSubType引用指向父類。

  • hasComponents

描述一種組合關系

  • ObjectNode一般都由多個VariableNode組成,ObjectNode包含某個VariableNode時,ObjectNode擁有一個hasComponents引用,指向該VariableNode;
  • VariableNode也可以包含子VariableNode,此時也用hasComponents描述它們的關系。
  • Organizes

指明兩個節點的層次結構,通過organizes可以把多個節點組織到同一個父節點下。

完整引用如下

opc_ua_refs

服務

服務可以看成是OPC UA服務器提供的API集合,OPC UA與定義了37個標准服務,常用的服務有:

  • 讀寫服務

可以獲取和修改服務器指定節點指定屬性的值

  • 調用服務

執行服務器上指定節點的方法

  • 訂閱數據變化和訂閱事件

可以監控服務器數據的變化

opc ua編程

Sdk

客戶端

  • opcua-client-gui

    使用python(pyqt5)開發使用pip可以安裝,跨平台

    sudo pip3 install pyqt5 -i https://pypi.mirrors.ustc.edu.cn/simple/
    sudo pip3 install numpy -i https://pypi.mirrors.ustc.edu.cn/simple/
    sudo pip3 install pyqtgraph -i https://pypi.mirrors.ustc.edu.cn/simple/
    sudo pip3 install cryptography -i https://pypi.mirrors.ustc.edu.cn/simple/
    sudo pip3 install opcua-client -i https://pypi.mirrors.ustc.edu.cn/simple/
    

模擬設備

可利用sdk自己開發 見下面的python demo

golang Demo

讀取服務器數據

package main

import (
	"context"
	"log"

	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
)

func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"

	ctx := context.Background()

	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	id, err := ua.ParseNodeID(nodeID)
	if err != nil {
		log.Fatalf("invalid node id: %v", err)
	}

	req := &ua.ReadRequest{
		MaxAge:             2000,
		NodesToRead:        []*ua.ReadValueID{{NodeID: id}},
		TimestampsToReturn: ua.TimestampsToReturnBoth,
	}

	resp, err := c.Read(req)
	if err != nil {
		log.Fatalf("Read failed: %s", err)
	}
	if resp.Results[0].Status != ua.StatusOK {
		log.Fatalf("Status not OK: %v", resp.Results[0].Status)
	}
	log.Printf("%#v", resp.Results[0].Value.Value())
}

向服務器寫數據

package main

import (
	"context"
	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
	"log"
)

func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"

	ctx := context.Background()

	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	id, err := ua.ParseNodeID(nodeID)
	if err != nil {
		log.Fatalf("invalid node id: %v", err)
	}

	v, err := ua.NewVariant(10.0)
	if err != nil {
		log.Fatalf("invalid value: %v", err)
	}

	req := &ua.WriteRequest{
		NodesToWrite: []*ua.WriteValue{
			{
				NodeID:      id,
				AttributeID: ua.AttributeIDValue,
				Value: &ua.DataValue{
					EncodingMask: ua.DataValueValue,
					Value:        v,
				},
			},
		},
	}

	resp, err := c.Write(req)
	if err != nil {
		log.Fatalf("Read failed: %s", err)
	}
	log.Printf("%v", resp.Results[0])
}

監聽服務器數據變化

package main

import (
	"context"
	"github.com/gopcua/opcua/monitor"
	"log"
	"os"
	"os/signal"
	"sync"
	"time"

	"github.com/gopcua/opcua"
	"github.com/gopcua/opcua/ua"
)

func cleanup(sub *monitor.Subscription, wg *sync.WaitGroup) {
	log.Printf("stats: sub=%d delivered=%d dropped=%d", sub.SubscriptionID(), sub.Delivered(), sub.Dropped())
	sub.Unsubscribe()
	wg.Done()
}

func startCallbackSub(ctx context.Context, m *monitor.NodeMonitor, interval, lag time.Duration, wg *sync.WaitGroup, nodes ...string) {
	sub, err := m.Subscribe(
		ctx,
		&opcua.SubscriptionParameters{
			Interval: interval,
		},
		func(s *monitor.Subscription, msg *monitor.DataChangeMessage) {
			if msg.Error != nil {
				log.Printf("[callback] error=%s", msg.Error)
			} else {
				log.Printf("[callback] node=%s value=%v", msg.NodeID, msg.Value.Value())
			}
			time.Sleep(lag)
		},
		nodes...)

	if err != nil {
		log.Fatal(err)
	}

	defer cleanup(sub, wg)

	<-ctx.Done()
}

func main() {
	endpoint := "opc.tcp://milo.digitalpetri.com:62541/milo"
	nodeID := "ns=2;s=Dynamic/RandomFloat"

	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt)

	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()

	go func() {
		<-signalCh
		println()
		cancel()
	}()

	c := opcua.NewClient(endpoint, opcua.SecurityMode(ua.MessageSecurityModeNone))
	if err := c.Connect(ctx); err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	m, err := monitor.NewNodeMonitor(c)
	if err != nil {
		log.Fatal(err)
	}

	m.SetErrorHandler(func(_ *opcua.Client, sub *monitor.Subscription, err error) {
		log.Printf("error: sub=%d err=%s", sub.SubscriptionID(), err.Error())
	})
	wg := &sync.WaitGroup{}

	// start callback-based subscription
	wg.Add(1)
	go startCallbackSub(ctx, m, time.Second, 0, wg, nodeID)

	<-ctx.Done()
	wg.Wait()
}

python opcua server demo

#!/usr/bin/env python3

from threading import Thread
import random
import time
from opcua import ua, uamethod, Server

@uamethod
def set_temperature(parent, variant):
    print(f"set_temperature {variant.Value}")
    temperature_thread.temperature.set_value(variant.Value)

@uamethod
def set_onoff(parent, variant):
    print(f"set_onoff {variant.Value}")
    temperature_thread.temperature.set_value(variant.Value)

# 這個類用於后台定時隨機修改值
class Temperature(Thread):
    def __init__(self, temperature, onoff):
        Thread.__init__(self)
        self._stop = False
        self.temperature = temperature
        self.onoff = onoff

    def stop(self):
        self._stop = True

    def run(self):
        count = 1
        while not self._stop:
            value = random.randint(-20, 100)
            self.temperature.set_value(value)
            print(f"random set temperature {value}")

            value = bool(random.randint(0, 1))
            self.onoff.set_value(value)
            print(f"random set onoff {value}")

            led_event.event.Message = ua.LocalizedText("high_temperature %d" % count)
            led_event.event.Severity = count
            #led_event.event.temperature = random.randint(60, 100)
            led_event.event.onoff = bool(random.randint(0, 1))
            led_event.trigger()
            count += 1

            time.sleep(10)

if __name__ == "__main__":
    # now setup our server
    server = Server()
    server.set_endpoint("opc.tcp://0.0.0.0:40840/tuyaopcua/server/")
    server.set_server_name("TuyaOpcUa Driver Demo Device")

    # set all possible endpoint policies for clients to connect through
    server.set_security_policy([
        ua.SecurityPolicyType.NoSecurity,
        ua.SecurityPolicyType.Basic128Rsa15_SignAndEncrypt,
        ua.SecurityPolicyType.Basic128Rsa15_Sign,
        ua.SecurityPolicyType.Basic256_SignAndEncrypt,
        ua.SecurityPolicyType.Basic256_Sign])

    # setup our own namespace
    uri = "http://tuya.com"
    idx = server.register_namespace(uri)

    # 添加一個 `空調` 對象
    air_conditioner = server.nodes.objects.add_object(idx, "AirConditioner")
    temperature = air_conditioner.add_variable(idx, "temperature", 20)
    temperature.set_writable()
    onoff = air_conditioner.add_variable(idx, "onoff", True)
    onoff.set_writable()


    air_conditioner.add_method(idx, "set_temperature", set_temperature, [ua.VariantType.UInt32])
    air_conditioner.add_method(idx, "set_onoff", set_onoff, [ua.VariantType.Boolean])

    # creating a default event object, the event object automatically will have members for all events properties
    led_event_type = server.create_custom_event_type(idx,
                                                     'high_temperature',
                                                     ua.ObjectIds.BaseEventType,
                                                     [('temperature', ua.VariantType.UInt32), ('onoff', ua.VariantType.Boolean)])

    led_event = server.get_event_generator(led_event_type, air_conditioner)
    led_event.event.Severity = 300

    # start opcua server
    server.start()
    print("Start opcua server...")

    temperature_thread = Temperature(temperature, onoff)
    temperature_thread.start()

    try:

        led_event.trigger(message="This is BaseEvent")

        while True:
            time.sleep(5)
    finally:
        print("Exit opcua server...")
        temperature_thread.stop()
        server.stop()


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM