Subscription有兩種模式,一種是Reporting,另一種是Sampling。
如果定義為Sampling,則這個Subscription是一個Triggered Item,即被激發的訂閱,需要一個定義為Reporting的Subscription(稱為Triggering Item)與它連接。這樣當Triggering Item更新時,會激發Triggered Item更新。代碼如下:
public class TriggeringExample implements UaClient { // public static void main(String[] args) throws Exception { // TriggeringExample example = new TriggeringExample(); // // new ClientExampleRunner(example).run(); // } private final Logger logger = LoggerFactory.getLogger(getClass()); private final AtomicLong clientHandles = new AtomicLong(1L); @Override public void run(OpcUaClient client, CompletableFuture<OpcUaClient> future) throws Exception { // synchronous connect client.connect().get(); // create a subscription @ 1000ms 一個訂閱可以包含多個監控item UaSubscription subscription = client.getSubscriptionManager() .createSubscription(1000.0) .get(); // subscribe to a static value that reports ReadValueId readValueId1 = new ReadValueId( new NodeId(2, "ch1.d1.tag1"), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE ); //創建監控item, 第一個為Reporting mode UaMonitoredItem reportingItem = createMonitoredItem(subscription, readValueId1, MonitoringMode.Reporting); //創建第二個監控item, 它是Sampling mode,需要第一個項激發 ReadValueId readValueId2 = new ReadValueId( new NodeId(2, "ch1.d1.tag2"), AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE ); UaMonitoredItem samplingItem = createMonitoredItem(subscription, readValueId2, MonitoringMode.Sampling); //將Triggering Item與Triggered item(可以有多個)連接起來(注意這里用了異步模式) subscription.addTriggeringLinks(reportingItem, newArrayList(samplingItem)).get(); // trigger reporting of both by writing to the static item and changing its value //VariableNode node = client.getAddressSpace().createVariableNode( // new NodeId(2, "ch1.d1.tag1")); //通過改變Triggering Item來激發Triggered item向我發送消息,注意這里writeValue使用了異步模式,如果沒有這個get(),只能得到一兩次消息更新 for(int i=0; i<10000; i++){ Variant newVal = new Variant(Unsigned.ushort(i)); DataValue va = new DataValue(newVal, null, null); client.writeValue(new NodeId(2, "ch1.d1.tag1"), va).get(); } //node.writeValue(va); // client.writeValue( // new NodeId(2, "ch1.d1.tag1"), // va // ).get(); // let the example run for 5 seconds then terminate Thread.sleep(500000); future.complete(client); } private UaMonitoredItem createMonitoredItem( UaSubscription subscription, ReadValueId readValueId, MonitoringMode monitoringMode ) throws ExecutionException, InterruptedException { // important: client handle must be unique per item UInteger clientHandle = uint(clientHandles.getAndIncrement()); MonitoringParameters parameters = new MonitoringParameters( clientHandle, 1000.0, null, uint(10), true ); MonitoredItemCreateRequest request = new MonitoredItemCreateRequest( readValueId, monitoringMode, parameters ); BiConsumer<UaMonitoredItem, Integer> onItemCreated = (item, id) -> item.setValueConsumer(this::onSubscriptionValue); List<UaMonitoredItem> items = subscription.createMonitoredItems( TimestampsToReturn.Both, newArrayList(request), onItemCreated ).get(); return items.get(0); } private void onSubscriptionValue(UaMonitoredItem item, DataValue value) { logger.info( "回調:subscription value received: item={}, value={}", item.getReadValueId().getNodeId(), value.getValue()); } }