Milo-OPC UA處理Subscription和Triggering


Subscription有兩種模式,一種是Reporting,另一種是Sampling。

如果定義為Sampling,則這個Subscription是一個Triggered Item,即被激發的訂閱,需要一個定義為Reporting的Subscription(稱為Triggering Item)與它連接。這樣當Triggering Item更新時,會激發Triggered Item更新。代碼如下:

public class TriggeringExample implements UaClient {

// public static void main(String[] args) throws Exception {
// TriggeringExample example = new TriggeringExample();
//
// new ClientExampleRunner(example).run();
// }

private final Logger logger = LoggerFactory.getLogger(getClass());

private final AtomicLong clientHandles = new AtomicLong(1L);

@Override
public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception {
// synchronous connect
client.connect().get();

// create a subscription @ 1000ms  一個訂閱可以包含多個監控item
UaSubscription subscription = client.getSubscriptionManager()
.createSubscription(1000.0)
.get();

// subscribe to a static value that reports
ReadValueId readValueId1 = new ReadValueId(
new NodeId(2, "ch1.d1.tag1"),
AttributeId.Value.uid(),
null,
QualifiedName.NULL_VALUE
);

//創建監控item, 第一個為Reporting mode

UaMonitoredItem reportingItem = createMonitoredItem(subscription, readValueId1, MonitoringMode.Reporting);

//創建第二個監控item, 它是Sampling mode,需要第一個項激發
ReadValueId readValueId2 = new ReadValueId(
new NodeId(2, "ch1.d1.tag2"),
AttributeId.Value.uid(),
null,
QualifiedName.NULL_VALUE
);


UaMonitoredItem samplingItem = createMonitoredItem(subscription, readValueId2, MonitoringMode.Sampling);

 

//將Triggering Item與Triggered item(可以有多個)連接起來(注意這里用了異步模式)

subscription.addTriggeringLinks(reportingItem, newArrayList(samplingItem)).get();

// trigger reporting of both by writing to the static item and changing its value
//VariableNode node = client.getAddressSpace().createVariableNode(
// new NodeId(2, "ch1.d1.tag1"));

//通過改變Triggering Item來激發Triggered item向我發送消息,注意這里writeValue使用了異步模式,如果沒有這個get(),只能得到一兩次消息更新
for(int i=0; i<10000; i++){
Variant newVal = new Variant(Unsigned.ushort(i));
DataValue va = new DataValue(newVal, null, null);
client.writeValue(new NodeId(2, "ch1.d1.tag1"), va).get();

}
//node.writeValue(va);

// client.writeValue(
// new NodeId(2, "ch1.d1.tag1"),
// va
// ).get();

// let the example run for 5 seconds then terminate
Thread.sleep(500000);
future.complete(client);
}

private UaMonitoredItem createMonitoredItem(
UaSubscription subscription,
ReadValueId readValueId,
MonitoringMode monitoringMode
) throws ExecutionException, InterruptedException {

// important: client handle must be unique per item
UInteger clientHandle = uint(clientHandles.getAndIncrement());

MonitoringParameters parameters = new MonitoringParameters(
clientHandle,
1000.0,
null,
uint(10),
true
);

MonitoredItemCreateRequest request = new MonitoredItemCreateRequest(
readValueId,
monitoringMode,
parameters
);

BiConsumer<UaMonitoredItem, Integer> onItemCreated =
(item, id) -> item.setValueConsumer(this::onSubscriptionValue);

List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
newArrayList(request),
onItemCreated
).get();

return items.get(0);
}

private void onSubscriptionValue(UaMonitoredItem item, DataValue value) {
logger.info(
"回調:subscription value received: item={}, value={}",
item.getReadValueId().getNodeId(), value.getValue());
}

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM