Skip to main content

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index] [List Home]
Re: [milo-dev] what is the best practice of build a opc ua server with milo?

Tony,

It seems your application has a deadlock. Can you get a thread dump (use jstack utility if you don't have any other means) while you're "frozen" on that line?

This modified subscription example demonstrates a working recovery from transfer failure: https://gist.github.com/kevinherron/9e4fc2831332ac37751c57bf7b2dd07d

If you run the example server, then run that modified subscription example, you'll see value changes print to the console. Then you can restart the example server and watch it recover and start printing values again.

On Tue, Aug 27, 2019 at 1:46 AM Tony Wei A <tony.a.wei@xxxxxxxxxxxx> wrote:

Hi Kevin,

 

I have two problems when I do the reconnecting work, if any good idea, thanks:

  1. I have implemented the method onSubscriptionTransferFailed and call it in the method connect in class OPcUaServerMonitor. But when the program run through the line UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get() in method setMonitoredItems(), the program just kept waiting here and can not continue. I do not know why, if any problem in my code?

The following two classed are my codes, I have marked the key codes to red:

Class OPcUaServerMonitor:

@Slf4j
public class OpcUaServerMonitor implements OpcUaDeviceAware {

    
private final GatewayService gateway;
    private final
OpcUaServerConfiguration configuration;

    private
OpcUaClient client;
    private
UaSubscription subscription;
    private
Map<NodeId, OpcUaDevice> devices;
    private
Map<NodeId, List<OpcUaDevice>> devicesByTags;
    private
Map<String, OpcUaDevice> devicesByName;
    private
Map<Pattern, DeviceMapping> mappings;
    private
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

    private final
AtomicLong clientHandles = new AtomicLong(1L);

    private
RpcProcessor rpcProcessor;

    public static
ExecutorService executorService = Executors.newFixedThreadPool(5);
    
/** 命名空间类型 */
    
private static final int NAMESPACE_TYPE_0 = 0;
    private static final int
NAMESPACE_TYPE_1 = 1;
    private static final int
NAMESPACE_TYPE_2 = 2;
    
/** 失败重连计数器 */
    
private static int TRANSFER_FAILED_COUNT = 0;
    private
UaSubscription.NotificationListener notificationListener;
    private
UaSubscriptionManager.SubscriptionListener subscriptionListener;

    public
OpcUaServerMonitor(GatewayService gateway, OpcUaServerConfiguration configuration) {
        
this.gateway = gateway;
        this
.configuration = configuration;
        this
.devices = new HashMap<>();
        this
.devicesByTags = new HashMap<>();
        this
.mappings = configuration.getMapping().stream().collect(Collectors.toMap(m -> Pattern.compile(m.getDeviceNodePattern()), Function.identity()));
        this
.devicesByName = new HashMap<>();
    
}

    
public void connect(Boolean isRemote) {
        
try {
            
log.info("Initializing OPC-UA server connection to [{}:{}]!", configuration.getHost(), configuration.getPort());

            
SecurityPolicy securityPolicy = SecurityPolicy.valueOf(configuration.getSecurity());
            
IdentityProvider identityProvider = configuration.getIdentity().toProvider();
            
            
log.info("================OPC UA Server Info================: {}", "opc.tcp://" + configuration.getHost() + ":" + configuration.getPort() + "/");
            
EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints("opc.tcp://" + configuration.getHost() + ":" + configuration.getPort() + "/").get();

            
EndpointDescription endpoint = Arrays.stream(endpoints)
                    .filter(e -> e.getSecurityPolicyUri().equals(
securityPolicy.getSecurityPolicyUri()))
                    .findFirst().orElseThrow(() ->
new Exception("no desired endpoints returned"));

            
OpcUaClientConfig config = getOpcUaClientConfig(securityPolicy, identityProvider, endpoint, isRemote);

            
client = new OpcUaClient(config);
            
client.connect().get();

            
subscription = client.getSubscriptionManager().createSubscription(1000.0).get();
            
rpcProcessor = new RpcProcessor(gateway, client, this);

            
//scanForDevices();
            
doSubscribeByTags();

            
// 解决重连问题
    
client.getSubscriptionManager().addSubscriptionListener(new MySubscriptionListener(this));
        
} catch (Exception e) {
            
log.error("OPC-UA server connection failed!", e);
            throw new
RuntimeException("OPC-UA server connection failed!", e);
        
}
    }

    ……………………………..



    
/**
     *
通过BiConsumer消费订阅消息,然后回调函数onSubscriptionValue处理订阅消息
    
* @param newTags
     
* @throws InterruptedException
     * @throws ExecutionException
     */
    
private void subscribeToTags(Map<String, NodeId> newTags) throws InterruptedException, ExecutionException {
        List<MonitoredItemCreateRequest> requests =
new ArrayList<>();
        for
(Map.Entry<String, NodeId> kv : newTags.entrySet()) {
            
// subscribe to the Value attribute of the server's CurrentTime node
            
ReadValueId readValueId = new ReadValueId(
                    kv.getValue()
,
                    
AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
            
// important: client handle must be unique per item
            
UInteger clientHandle = uint(clientHandles.getAndIncrement());

            
MonitoringParameters parameters = new MonitoringParameters(
                    clientHandle
,
                    
1000.0,     // sampling interval
                    
null,       // filter, null means use default
                    
uint(10),   // queue size
                    
true        // discard oldest
            
);

            
requests.add(new MonitoredItemCreateRequest(
                    readValueId
, MonitoringMode.Reporting, parameters));
        
}

        BiConsumer<UaMonitoredItem
, Integer> _onItemCreated_ =
                (item
, id) -> item.setValueConsumer(this::onSubscriptionValue);

        
List<UaMonitoredItem> items = subscription.createMonitoredItems(
                TimestampsToReturn.
Both,
                
requests,
                
onItemCreated
        ).get()
;

        for
(UaMonitoredItem item : items) {
            
if (item.getStatusCode().isGood()) {
                
log.trace("Monitoring Item created for nodeId={}", item.getReadValueId().getNodeId());
            
} else {
                
log.warn("Failed to create item for nodeId={} (status={})",
                        
item.getReadValueId().getNodeId(), item.getStatusCode());
            
}
        }
    }

    
private void onSubscriptionValue(UaMonitoredItem item, DataValue dataValue) {
        
log.info("#Subscription value received: item={}, value={}",
                
item.getReadValueId().getNodeId(), dataValue.getValue());
      
// 将接收到的数据记录到TXT文件中,示例format2019.08.20 12:00:00 item=NodeId{ns=2, id=Simulator1.Device1.Tag1}, value=Variant{value=11868}
        
OpcUaUtils.writeStrToTxt("item=" + item.getReadValueId().getNodeId() + ", value=" + dataValue.getValue());
        
NodeId tagId = item.getReadValueId().getNodeId();
        
devicesByTags.getOrDefault(tagId, Collections.emptyList()).forEach(
                device -> {
                    device.updateTag(
tagId, dataValue);
                    
List<KvEntry> attributes = device.getAffectedAttributes(tagId, dataValue);
                    if
(attributes.size() > 0) {
                        
gateway.onDeviceAttributesUpdate(device.getDeviceName(), attributes);
                    
}
                    List<TsKvEntry> timeseries = device.getAffectedTimeseries(
tagId, dataValue);
                    if
(timeseries.size() > 0) {
                        
gateway.onDeviceTelemetry(device.getDeviceName(), timeseries);
                    
}
                }
        )
;
   
}

    ……………………………..


    
public void setMonitoredItems()
        
throws InterruptedException, ExecutionException, IOException {
        
UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get(); // ????????????? just waiting here and can not continue

        try
{
            NodeId nodeId =
new NodeId(2, "通道1.设备1.目标1");
            
Map<String, NodeId> newTags = new HashMap<>();
            
newTags.put("目标1", nodeId);

            
List<MonitoredItemCreateRequest> requests = new ArrayList<>();
            for
(Map.Entry<String, NodeId> kv : newTags.entrySet()) {
                
// subscribe to the Value attribute of the server's CurrentTime node
                
ReadValueId readValueId = new ReadValueId(
                    kv.getValue()
,
                    
AttributeId.Value.uid(), null, QualifiedName.NULL_VALUE);
                
// important: client handle must be unique per item
                
UInteger clientHandle = uint(clientHandles.getAndIncrement());

                
MonitoringParameters parameters = new MonitoringParameters(
                    clientHandle
,
                    
1000.0,     // sampling interval
                    
null,       // filter, null means use default
                    
uint(10),   // queue size
                    
true        // discard oldest
                
);

                
requests.add(new MonitoredItemCreateRequest(
                    readValueId
, MonitoringMode.Reporting, parameters));
            
}

            BiConsumer<UaMonitoredItem
, Integer> _onItemCreated_ =
                (item
, id) -> item.setValueConsumer(this::onSubscriptionValue);


            
List<UaMonitoredItem> items = subscription.createMonitoredItems(
                TimestampsToReturn.
Both,
                
requests,
                
onItemCreated
            ).get()
;
        
}catch (InterruptedException e){
            e.printStackTrace()
;
        
}catch (ExecutionException e){
            e.printStackTrace()
;
        
}
    }
}

               

 

Class MySubscriptionListener:

@Slf4j
public class MySubscriptionListener implements UaSubscriptionManager.SubscriptionListener{

    
private OpcUaServerMonitor monitor;

    public
MySubscriptionListener(OpcUaServerMonitor monitor){
        
this.monitor = monitor;
    
}

    
private void onSubscriptionValue(UaMonitoredItem item, DataValue dataValue) {
        
log.info("#Subscription value received: item={}, value={}",
            
item.getReadValueId().getNodeId(), dataValue.getValue());
        
// 将接收到的数据记录到TXT文件中,示例format2019.08.20 12:00:00 item=NodeId{ns=2, id=Simulator1.Device1.Tag1}, value=Variant{value=11868}
        
OpcUaUtils.writeStrToTxt("item=" + item.getReadValueId().getNodeId() + ", value=" + dataValue.getValue());
    
}

    
// 重连之后,重新进行tag订阅
   
@Override
    
public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {

        
try{
            
monitor.setMonitoredItems();
        
}catch (InterruptedException e){
            e.printStackTrace()
;
        
}catch (ExecutionException e){
            e.printStackTrace()
;
        
}catch (IOException e){
            e.printStackTrace()
;
        
}

    }
}

 

  1. I have implemented the method onSubscriptionTransferFailed just as in my problem 1. The following two scenes I got different results
  1. I cut down the net connection between opcua client and server, but they still keep running. Then I recovery the net connection.

Result: Client can reconnect successfully and also can call the method onSubscriptionTransferFailed. The result is normal.

  1. The net connection between client and server is normal, and then I stop the server and restart it.

Result: the client can reconnect to the server but can not trigger the call of method onSubscriptionTransferFailed.

 

Thanks so much!

 

Regards,

Tony

 

From: milo-dev-bounces@xxxxxxxxxxx <milo-dev-bounces@xxxxxxxxxxx> On Behalf Of Kevin Herron
Sent: Monday, August 26, 2019 08:04 PM
To: milo developer discussions <milo-dev@xxxxxxxxxxx>
Subject: Re: [milo-dev] what is the best practice of build a opc ua server with milo?

 

@shigeru,

 

I don't think you need to check the StatusCode, it's mostly informative. You should re-create the subscription regardless of what the StatusCode will be. You're currently not handling the case where the server doesn't support the subscription transfer service at all, in which case the StatusCode could be any of:

 

Bad_NotImplemented
Bad_NotSupported
Bad_OutOfService
Bad_ServiceUnsupported

 

The StatusCode received as a parameter is ultimately whatever the server sent back at either the operation level or service level, and it can differ because not all servers are implemented the same way.

 

On Mon, Aug 26, 2019 at 4:41 AM Kevin Herron <kevinherron@xxxxxxxxx> wrote:

As of 0.3.0 Milo client automatically has a heartbeat between client and server. 

 

All of the state machine in OPC UA Part 4, Section 6.5 is implemented by the client SDK, including subscription transfers. You don't need to do anything to make the heartbeat work or make subscription transfers take place.

 

You do need to implement the SubscriptionListener.onSubscriptionTransferFailed() method, though, and re-create the subscription when that method is invoked. This happens when a reconnect was successful but subscription transfer failed for some reason (often the server doesn't support it or it no longer has the subscription because e.g. it restarted).

 

On Mon, Aug 26, 2019 at 3:30 AM shigeru ishida <s5u.ishida@xxxxxxxxx> wrote:

The specifications is available here. User registration is required.

https://opcfoundation.org/developer-tools/specifications-unified-architecture

 

2019826() 18:06 Tony Wei A <tony.a.wei@xxxxxxxxxxxx>:

Hi Shigeru,

 

Where can I find the Part 4: Services and 6.5 Re-establishing connections you mentioned in your last mail?

And I have implemented the method onSubscriptionTransferFailed according to your guiding.

 

Thanks a lot!

 

Tony

 

From: milo-dev-bounces@xxxxxxxxxxx <milo-dev-bounces@xxxxxxxxxxx> On Behalf Of shigeru ishida
Sent: Monday, August 26, 2019 03:56 PM
To: milo developer discussions <milo-dev@xxxxxxxxxxx>
Subject: Re: [milo-dev] what is the best practice of build a opc ua server with milo?

 

For your reference, I implemented onSubscriptionTransferFailed() in the OPC-UA client of my tool,
referring to the following specifications.

 

OPC Unified Architecture Specification Part 4: Services
- 6.5 Re-establishing connections

 

Now when the server is restarted, the client continues processing.

 

 client.getSubscriptionManager().addSubscriptionListener(new MySubscriptionListener());

.....

.....

public class MySubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
    @Override
    public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
        if ((statusCode.getValue() == StatusCodes.Bad_SubscriptionIdInvalid) ||
            (statusCode.getValue() == StatusCodes.Bad_MessageNotAvailable)) {

            .....

            .....
            List<UaMonitoredItem> items = subscription.createMonitoredItems(.....).get();
            .....
            ..... 
        }
    }
}

 

 

2019826() 15:03 Tony Wei A <tony.a.wei@xxxxxxxxxxxx>:

Hi,

 

If heartbeat exists between OPCUA client and server in milo? I have a problem about reconnection mechanism that when the OPC UA server does not work and normal again, how does the client  reconnect to OPCUA server?

 

Thanks!

 

Regards,

Tony

 

 

From: milo-dev-bounces@xxxxxxxxxxx <milo-dev-bounces@xxxxxxxxxxx> On Behalf Of Kevin Herron
Sent: Saturday, August 24, 2019 07:29 PM
To: milo developer discussions <milo-dev@xxxxxxxxxxx>
Subject: Re: [milo-dev] what is the best practice of build a opc ua server with milo?

 

I'm not sure there's any reason to implement your own NodeManager.

 

It's not possible to implement your own SessionManager and plug that into the SDK right now.

 

On Fri, Aug 23, 2019 at 7:50 PM 黄超 <longren239368608@xxxxxxx> wrote:

actually, I think I should implement all the methods, because the data can be modified by other systems. Browse, Read, Write, Subscription.. All of them should be implemented. That is plenty of work. And I found I should create a subclass of ManagedNamespace, and implement my own NodeManager and SessionManager. I don't know if it is correct.


At 2019-08-23 19:37:49, "Kevin Herron" <
kevinherron@xxxxxxxxx> wrote:

also* used, not always used.

 

On Fri, Aug 23, 2019 at 4:37 AM Kevin Herron <kevinherron@xxxxxxxxx> wrote:

I should mention I've always used a hybrid approach, where I create UaNodes to model everything, but still override read() and write().

 

When the attribute is Value I do something special like read from or write to a PLC, but for any other attribute I just read/write from the UaNode like the managed address space implementation would.

 

On Fri, Aug 23, 2019 at 4:27 AM Kevin Herron <kevinherron@xxxxxxxxx> wrote:

You seem to have a pretty good grasps of the tradeoffs between the 2 approaches.

 

When there's too much data being modeled to reasonable fit into memory I use approach #1 - override the read() and write() on AttributeServices interface (which is part of AddressSpaceServices interface). This requires more work but allows you to implement reads/writes in such a way that you never even have to have an instance in memory of UaNode for the NodeId being read from or written to.

 

In fact, you'll notice all of the APIs you implement from AddressSpaceServices allow for this possibility. The SDK was carefully designed to minimize the amount of *required* nodes you keep in memory.

 

This makes things more difficult to implement though, which is where approach #2 comes in - use UaNodes for everything and let the ManagedAddressSpace implementation handle reads and writes against the nodes. When you do this approach you have to either install a bunch of AttributeDelegate (or AttributeFilter in 0.4 branch, which replaces deprecated AttributeDelegate), or handle syncing data from your external system to the UaNode instances periodically so that the data is refreshed.

 

Unfortunately there's no public example of approach #1 demonstrating a "real life" system, but I'm hoping to have one by the end of the year.

 

On Fri, Aug 23, 2019 at 2:21 AM 黄超 <longren239368608@xxxxxxx> wrote:

I am working on a poc of opc ua server demo with milo. I have a backend system that is already running, and I want to make it an opc ua server. Other opc ua clients can commucate with this server using services defined in opc ua spec such as addNodes, read, write, browse, query, call method, createSubcription...

I encounts some problems, and I have some questions:

1. when receives `read` requests from clients, how the server obtains data from my existing backend system? Choice-1, with implementing some interfaces or overriding some methods, when the server receives `read` requests from clients, it can invoke overriding methods to read real data from backend system. I tried it, but found it difficulty. Did I find a wrong way? Choice-2, Read the data from backend system to the ua server's memory with `write`, then no methods would be overrided. In this case, all data would be loaded into memory.

2. If the best practice is choice-1, what should I do? What interfaces should be implemented?

3. If the best practice is choice-2, huge data would be the new problem. It is not a good idea to load them all into the memory.

4. If all data is loaded into memory, when original data changes, sync will be another problem.

 

 

_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev

 

 

_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev

_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev

_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev

_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev


Back to the top