Hi Kevin,
I have two problems when I do the reconnecting work, if any good idea, thanks:
- I have implemented the method onSubscriptionTransferFailed and call it in the method connect in class OPcUaServerMonitor. But when the program run through the line
UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get() in method setMonitoredItems(), the program just kept waiting here and can not continue. I do not know why, if any problem in my code?
The following two classed are my codes, I have marked the key codes to red:
Class OPcUaServerMonitor:
@Slf4j
public class
OpcUaServerMonitor
implements
OpcUaDeviceAware {
private final
GatewayService
gateway;
private final OpcUaServerConfiguration
configuration;
private OpcUaClient
client;
private UaSubscription
subscription;
private Map<NodeId,
OpcUaDevice>
devices;
private Map<NodeId,
List<OpcUaDevice>>
devicesByTags;
private Map<String,
OpcUaDevice>
devicesByName;
private Map<Pattern,
DeviceMapping>
mappings;
private ScheduledExecutorService
executor
= Executors.newSingleThreadScheduledExecutor();
private final AtomicLong
clientHandles
=
new
AtomicLong(1L);
private RpcProcessor
rpcProcessor;
public static ExecutorService
executorService
= Executors.newFixedThreadPool(5);
/**
命名空间类型
*/
private static final int
NAMESPACE_TYPE_0
=
0;
private static final int NAMESPACE_TYPE_1
=
1;
private static final int NAMESPACE_TYPE_2
=
2;
/**
失败重连计数器
*/
private static int
TRANSFER_FAILED_COUNT
=
0;
private UaSubscription.NotificationListener
notificationListener;
private UaSubscriptionManager.SubscriptionListener
subscriptionListener;
public OpcUaServerMonitor(GatewayService gateway,
OpcUaServerConfiguration configuration) {
this.gateway
= gateway;
this.configuration
= configuration;
this.devices
=
new
HashMap<>();
this.devicesByTags
=
new
HashMap<>();
this.mappings
= configuration.getMapping().stream().collect(Collectors.toMap(m -> Pattern.compile(m.getDeviceNodePattern()),
Function.identity()));
this.devicesByName
=
new
HashMap<>();
}
public void
connect(Boolean isRemote) {
try
{
log.info("Initializing
OPC-UA server connection to [{}:{}]!",
configuration.getHost(),
configuration.getPort());
SecurityPolicy securityPolicy = SecurityPolicy.valueOf(configuration.getSecurity());
IdentityProvider identityProvider =
configuration.getIdentity().toProvider();
log.info("================OPC
UA Server Info================: {}",
"opc.tcp://"
+
configuration.getHost() +
":"
+
configuration.getPort() +
"/");
EndpointDescription[] endpoints = UaTcpStackClient.getEndpoints("opc.tcp://"
+
configuration.getHost() +
":"
+
configuration.getPort() +
"/").get();
EndpointDescription endpoint = Arrays.stream(endpoints)
.filter(e -> e.getSecurityPolicyUri().equals(securityPolicy.getSecurityPolicyUri()))
.findFirst().orElseThrow(() -> new
Exception("no desired endpoints returned"));
OpcUaClientConfig config = getOpcUaClientConfig(securityPolicy,
identityProvider,
endpoint,
isRemote);
client
=
new
OpcUaClient(config);
client.connect().get();
subscription
=
client.getSubscriptionManager().createSubscription(1000.0).get();
rpcProcessor
=
new
RpcProcessor(gateway,
client, this);
//scanForDevices();
doSubscribeByTags();
//
解决重连问题
client.getSubscriptionManager().addSubscriptionListener(new MySubscriptionListener(this));
}
catch
(Exception e) {
log.error("OPC-UA
server connection failed!",
e);
throw new RuntimeException("OPC-UA server connection
failed!",
e);
}
}
……………………………..
/**
* 通过BiConsumer消费订阅消息,然后回调函数onSubscriptionValue处理订阅消息
*
@param newTags
*
@throws InterruptedException
* @throws ExecutionException
*/
private void
subscribeToTags(Map<String,
NodeId> newTags)
throws
InterruptedException,
ExecutionException {
List<MonitoredItemCreateRequest> requests = new
ArrayList<>();
for (Map.Entry<String,
NodeId> kv : newTags.entrySet()) {
// subscribe to the Value attribute of the server's CurrentTime node
ReadValueId readValueId =
new
ReadValueId(
kv.getValue(),
AttributeId.Value.uid(),
null, QualifiedName.NULL_VALUE);
// important: client handle must be unique per item
UInteger clientHandle =
uint(clientHandles.getAndIncrement());
MonitoringParameters parameters =
new
MonitoringParameters(
clientHandle,
1000.0, //
sampling interval
null, // filter, null means use default
uint(10),
// queue size
true // discard oldest
);
requests.add(new
MonitoredItemCreateRequest(
readValueId,
MonitoringMode.Reporting,
parameters));
}
BiConsumer<UaMonitoredItem,
Integer> _onItemCreated_ =
(item,
id) -> item.setValueConsumer(this::onSubscriptionValue);
List<UaMonitoredItem> items =
subscription.createMonitoredItems(
TimestampsToReturn.Both,
requests,
onItemCreated
).get();
for (UaMonitoredItem item : items) {
if
(item.getStatusCode().isGood()) {
log.trace("Monitoring
Item created for nodeId={}",
item.getReadValueId().getNodeId());
}
else
{
log.warn("Failed
to create item for nodeId={} (status={})",
item.getReadValueId().getNodeId(),
item.getStatusCode());
}
}
}
private void
onSubscriptionValue(UaMonitoredItem item,
DataValue dataValue) {
log.info("#Subscription
value received: item={}, value={}",
item.getReadValueId().getNodeId(),
dataValue.getValue());
//
将接收到的数据记录到TXT文件中,示例format:2019.08.20
12:00:00 item=NodeId{ns=2, id=Simulator1.Device1.Tag1}, value=Variant{value=11868}
OpcUaUtils.writeStrToTxt("item="
+ item.getReadValueId().getNodeId() +
", value="
+ dataValue.getValue());
NodeId tagId = item.getReadValueId().getNodeId();
devicesByTags.getOrDefault(tagId,
Collections.emptyList()).forEach(
device -> {
device.updateTag(tagId,
dataValue);
List<KvEntry> attributes = device.getAffectedAttributes(tagId,
dataValue);
if (attributes.size() >
0) {
gateway.onDeviceAttributesUpdate(device.getDeviceName(),
attributes);
}
List<TsKvEntry> timeseries = device.getAffectedTimeseries(tagId,
dataValue);
if (timeseries.size() >
0) {
gateway.onDeviceTelemetry(device.getDeviceName(),
timeseries);
}
}
);
}
……………………………..
public void
setMonitoredItems()
throws
InterruptedException,
ExecutionException,
IOException {
UaSubscription subscription = client.getSubscriptionManager().createSubscription(1000.0).get(); // ????????????? just waiting here and can not continue
try{
NodeId nodeId = new
NodeId(2,
"通道1.设备1.目标1");
Map<String,
NodeId> newTags =
new
HashMap<>();
newTags.put("目标1",
nodeId);
List<MonitoredItemCreateRequest> requests =
new
ArrayList<>();
for (Map.Entry<String,
NodeId> kv : newTags.entrySet()) {
// subscribe to the Value attribute of the server's CurrentTime node
ReadValueId readValueId =
new
ReadValueId(
kv.getValue(),
AttributeId.Value.uid(),
null, QualifiedName.NULL_VALUE);
// important: client handle must be unique per item
UInteger clientHandle =
uint(clientHandles.getAndIncrement());
MonitoringParameters parameters =
new
MonitoringParameters(
clientHandle,
1000.0, //
sampling interval
null, // filter, null means use default
uint(10),
// queue size
true // discard oldest
);
requests.add(new
MonitoredItemCreateRequest(
readValueId,
MonitoringMode.Reporting,
parameters));
}
BiConsumer<UaMonitoredItem,
Integer> _onItemCreated_ =
(item,
id) -> item.setValueConsumer(this::onSubscriptionValue);
List<UaMonitoredItem> items = subscription.createMonitoredItems(
TimestampsToReturn.Both,
requests,
onItemCreated
).get();
}catch
(InterruptedException e){
e.printStackTrace();
}catch
(ExecutionException e){
e.printStackTrace();
}
}
}
Class MySubscriptionListener:
@Slf4j
public class
MySubscriptionListener
implements
UaSubscriptionManager.SubscriptionListener{
private
OpcUaServerMonitor
monitor;
public MySubscriptionListener(OpcUaServerMonitor monitor){
this.monitor
= monitor;
}
private void
onSubscriptionValue(UaMonitoredItem item,
DataValue dataValue) {
log.info("#Subscription
value received: item={}, value={}",
item.getReadValueId().getNodeId(),
dataValue.getValue());
//
将接收到的数据记录到TXT文件中,示例format:2019.08.20
12:00:00 item=NodeId{ns=2, id=Simulator1.Device1.Tag1}, value=Variant{value=11868}
OpcUaUtils.writeStrToTxt("item="
+ item.getReadValueId().getNodeId() +
", value="
+ dataValue.getValue());
}
//
重连之后,重新进行tag订阅
@Override
public void
onSubscriptionTransferFailed(UaSubscription subscription,
StatusCode statusCode) {
try{
monitor.setMonitoredItems();
}catch
(InterruptedException e){
e.printStackTrace();
}catch
(ExecutionException e){
e.printStackTrace();
}catch
(IOException e){
e.printStackTrace();
}
}
}
- I have implemented the method onSubscriptionTransferFailed just as in my problem 1. The following two scenes I got different results
- I cut down the net connection between opcua client and server, but they still keep running. Then I recovery the net connection.
Result: Client can reconnect successfully and also can call the method onSubscriptionTransferFailed. The result is normal.
- The net connection between client and server is normal, and then I stop the server and restart it.
Result: the client can reconnect to the server but can not trigger the call of method onSubscriptionTransferFailed.
Thanks so much!
Regards,
Tony
From: milo-dev-bounces@xxxxxxxxxxx <milo-dev-bounces@xxxxxxxxxxx>
On Behalf Of Kevin Herron
Sent: Monday, August 26, 2019 08:04 PM
To: milo developer discussions <milo-dev@xxxxxxxxxxx>
Subject: Re: [milo-dev] what is the best practice of build a opc ua server with milo?
@shigeru,
I don't think you need to check the StatusCode, it's mostly informative. You should re-create the subscription regardless of what the StatusCode will be. You're currently not handling the case where the server doesn't support the subscription
transfer service at all, in which case the StatusCode could be any of:
Bad_NotImplemented
Bad_NotSupported
Bad_OutOfService
Bad_ServiceUnsupported
The StatusCode received as a parameter is ultimately whatever the server sent back at either the operation level or service level, and it can differ because not all servers are implemented the same way.
As of 0.3.0 Milo client automatically has a heartbeat between client and server.
All of the state machine in OPC UA Part 4, Section 6.5 is implemented by the client SDK, including subscription transfers. You don't need to do anything to make the heartbeat work or make subscription transfers take place.
You do need to implement the SubscriptionListener.onSubscriptionTransferFailed() method, though, and re-create the subscription when that method is invoked. This happens when a reconnect was successful but subscription transfer failed for
some reason (often the server doesn't support it or it no longer has the subscription because e.g. it restarted).
Hi Shigeru,
Where can I find the Part 4: Services and 6.5 Re-establishing connections you mentioned in your last mail?
And I have implemented the method onSubscriptionTransferFailed according to your guiding.
Thanks a lot!
Tony
From:
milo-dev-bounces@xxxxxxxxxxx <milo-dev-bounces@xxxxxxxxxxx>
On Behalf Of shigeru ishida
Sent: Monday, August 26, 2019 03:56 PM
To: milo developer discussions <milo-dev@xxxxxxxxxxx>
Subject: Re: [milo-dev] what is the best practice of build a opc ua server with milo?
For your reference, I implemented onSubscriptionTransferFailed() in the OPC-UA client of my tool,
referring to the following specifications.
OPC Unified Architecture Specification Part 4: Services
- 6.5 Re-establishing connections
Now when the server is restarted, the client continues processing.
client.getSubscriptionManager().addSubscriptionListener(new
MySubscriptionListener());
public class MySubscriptionListener implements UaSubscriptionManager.SubscriptionListener {
@Override
public void onSubscriptionTransferFailed(UaSubscription subscription, StatusCode statusCode) {
if ((statusCode.getValue() == StatusCodes.Bad_SubscriptionIdInvalid) ||
(statusCode.getValue() == StatusCodes.Bad_MessageNotAvailable)) {
.....
List<UaMonitoredItem> items = subscription.createMonitoredItems(.....).get();
.....
.....
}
}
}
Hi,
If heartbeat exists between OPCUA client and server in milo? I have a problem about reconnection mechanism that when the OPC UA server does not work and normal again, how does the
client reconnect to OPCUA server?
Thanks!
Regards,
Tony
From:
milo-dev-bounces@xxxxxxxxxxx <milo-dev-bounces@xxxxxxxxxxx>
On Behalf Of Kevin Herron
Sent: Saturday, August 24, 2019 07:29 PM
To: milo developer discussions <milo-dev@xxxxxxxxxxx>
Subject: Re: [milo-dev] what is the best practice of build a opc ua server with milo?
I'm not sure there's any reason to implement your own NodeManager.
It's not possible to implement your own SessionManager and plug that into the SDK right now.
actually, I think I should implement all the methods, because the data can be modified by other systems. Browse,
Read, Write, Subscription.. All of them should be implemented. That is plenty of work. And I found I should create a subclass of ManagedNamespace, and implement my own NodeManager and SessionManager. I don't know if it is correct.
At 2019-08-23 19:37:49, "Kevin Herron" <kevinherron@xxxxxxxxx>
wrote:
also* used, not always used.
I should mention I've always used a hybrid approach, where I create UaNodes to model everything, but still
override read() and write().
When the attribute is Value I do something special like read from or write to a PLC, but for any other
attribute I just read/write from the UaNode like the managed address space implementation would.
You seem to have a pretty good grasps of the tradeoffs between the 2 approaches.
When there's too much data being modeled to reasonable fit into memory I use approach #1 - override the
read() and write() on AttributeServices interface (which is part of AddressSpaceServices interface). This requires more work but allows you to implement reads/writes in such a way that you never even have to have an instance in memory of UaNode for the NodeId
being read from or written to.
In fact, you'll notice all of the APIs you implement from AddressSpaceServices allow for this possibility.
The SDK was carefully designed to minimize the amount of *required* nodes you keep in memory.
This makes things more difficult to implement though, which is where approach #2 comes in - use UaNodes
for everything and let the ManagedAddressSpace implementation handle reads and writes against the nodes. When you do this approach you have to either install a bunch of AttributeDelegate (or AttributeFilter in 0.4 branch, which replaces deprecated AttributeDelegate),
or handle syncing data from your external system to the UaNode instances periodically so that the data is refreshed.
Unfortunately there's no public example of approach #1 demonstrating a "real life" system, but I'm hoping
to have one by the end of the year.
I am working on a poc of opc ua server demo with milo. I have a backend system that is already running,
and I want to make it an opc ua server. Other opc ua clients can commucate with this server using services defined in opc ua spec such as addNodes, read, write, browse, query, call method, createSubcription...
I encounts some problems, and I have some questions:
1. when receives `read` requests from clients, how the server obtains data from my existing backend system?
Choice-1, with implementing some interfaces or overriding some methods, when the server receives `read` requests from clients, it can invoke overriding methods to read real data from backend system. I tried it, but found it difficulty. Did I find a wrong way?
Choice-2, Read the data from backend system to the ua server's memory with `write`, then no methods would be overrided. In this case, all data would be loaded into memory.
2. If the best practice is choice-1, what should I do? What interfaces should be implemented?
3. If the best practice is choice-2, huge data would be the new problem. It is not a good idea to load
them all into the memory.
4. If all data is loaded into memory, when original data changes, sync will be another problem.
_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev
_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev
_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev
_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev
_______________________________________________
milo-dev mailing list
milo-dev@xxxxxxxxxxx
To change your delivery options, retrieve your password, or unsubscribe from this list, visit
https://www.eclipse.org/mailman/listinfo/milo-dev