"""
add nodes defined in XML to address space
format is the one from opc-ua specification
"""
import logging
import uuid
from copy import copy
import opcua
from opcua import ua
from opcua.common import xmlparser
from opcua.ua.uaerrors import UaError
import sys
if sys.version_info.major > 2:
unicode = str
[docs]class XmlImporter(object):
def __init__(self, server):
self.logger = logging.getLogger(__name__)
self.parser = None
self.server = server
self.namespaces = {}
self.aliases = {}
self.refs = None
def _map_namespaces(self, namespaces_uris):
"""
creates a mapping between the namespaces in the xml file and in the server.
if not present the namespace is registered.
"""
namespaces = {}
for ns_index, ns_uri in enumerate(namespaces_uris):
ns_server_index = self.server.register_namespace(ns_uri)
namespaces[ns_index + 1] = ns_server_index
return namespaces
def _map_aliases(self, aliases):
"""
maps the import aliases to the correct namespaces
"""
aliases_mapped = {}
for alias, node_id in aliases.items():
aliases_mapped[alias] = self.to_nodeid(node_id)
return aliases_mapped
[docs] def import_xml(self, xmlpath=None, xmlstring=None):
"""
import xml and return added nodes
"""
self.logger.info("Importing XML file %s", xmlpath)
self.parser = xmlparser.XMLParser(xmlpath, xmlstring)
self.namespaces = self._map_namespaces(self.parser.get_used_namespaces())
self._unmapped_aliases = self.parser.get_aliases()
self.aliases = self._map_aliases(self._unmapped_aliases)
self.refs = []
dnodes = self.parser.get_node_datas()
dnodes = self.make_objects(dnodes)
nodes_parsed = self._sort_nodes_by_parentid(dnodes)
nodes = []
for nodedata in nodes_parsed: # self.parser:
try:
node = self._add_node_data(nodedata)
except Exception:
self.logger.warning("failure adding node %s", nodedata)
raise
nodes.append(node)
self.refs, remaining_refs = [], self.refs
self._add_references(remaining_refs)
if len(self.refs) != 0:
self.logger.warning("The following references could not be imported and are probaly broken: %s", self.refs)
return nodes
def _add_node_data(self, nodedata):
if nodedata.nodetype == 'UAObject':
node = self.add_object(nodedata)
elif nodedata.nodetype == 'UAObjectType':
node = self.add_object_type(nodedata)
elif nodedata.nodetype == 'UAVariable':
node = self.add_variable(nodedata)
elif nodedata.nodetype == 'UAVariableType':
node = self.add_variable_type(nodedata)
elif nodedata.nodetype == 'UAReferenceType':
node = self.add_reference_type(nodedata)
elif nodedata.nodetype == 'UADataType':
node = self.add_datatype(nodedata)
elif nodedata.nodetype == 'UAMethod':
node = self.add_method(nodedata)
else:
self.logger.warning("Not implemented node type: %s ", nodedata.nodetype)
return node
def _add_node(self, node):
if isinstance(self.server, opcua.server.server.Server):
return self.server.iserver.isession.add_nodes([node])
else:
return self.server.uaclient.add_nodes([node])
def _add_references(self, refs):
if isinstance(self.server, opcua.server.server.Server):
res = self.server.iserver.isession.add_references(refs)
else:
res = self.server.uaclient.add_references(refs)
for sc, ref in zip(res, refs):
if not sc.is_good():
self.refs.append(ref)
[docs] def make_objects(self, node_datas):
new_nodes = []
for ndata in node_datas:
ndata.nodeid = ua.NodeId.from_string(ndata.nodeid)
ndata.browsename = ua.QualifiedName.from_string(ndata.browsename)
if ndata.parent:
ndata.parent = ua.NodeId.from_string(ndata.parent)
if ndata.parentlink:
ndata.parentlink = self._to_nodeid(ndata.parentlink)
if ndata.typedef:
ndata.typedef = self._to_nodeid(ndata.typedef)
new_nodes.append(ndata)
return new_nodes
def _migrate_ns(self, nodeid):
"""
Check if the index of nodeid or browsename given in the xml model file
must be converted to a already existing namespace id based on the files
namespace uri
:returns: NodeId (str)
"""
if nodeid.NamespaceIndex in self.namespaces:
nodeid = copy(nodeid)
nodeid.NamespaceIndex = self.namespaces[nodeid.NamespaceIndex]
return nodeid
def _get_node(self, obj):
node = ua.AddNodesItem()
node.RequestedNewNodeId = self._migrate_ns(obj.nodeid)
node.BrowseName = self._migrate_ns(obj.browsename)
self.logger.info("Importing xml node (%s, %s) as (%s %s)", obj.browsename, obj.nodeid, node.BrowseName, node.RequestedNewNodeId)
node.NodeClass = getattr(ua.NodeClass, obj.nodetype[2:])
if obj.parent and obj.parentlink:
node.ParentNodeId = self._migrate_ns(obj.parent)
node.ReferenceTypeId = self._migrate_ns(obj.parentlink)
if obj.typedef:
node.TypeDefinition = self._migrate_ns(obj.typedef)
return node
def _to_nodeid(self, nodeid):
if isinstance(nodeid, ua.NodeId):
return nodeid
elif not nodeid:
return ua.NodeId(ua.ObjectIds.String)
elif "=" in nodeid:
return ua.NodeId.from_string(nodeid)
elif hasattr(ua.ObjectIds, nodeid):
return ua.NodeId(getattr(ua.ObjectIds, nodeid))
else:
if nodeid in self.aliases:
return self._to_nodeid(self._unmapped_aliases[nodeid])
else:
return ua.NodeId(getattr(ua.ObjectIds, nodeid))
[docs] def to_nodeid(self, nodeid):
return self._migrate_ns(self._to_nodeid(nodeid))
[docs] def add_object(self, obj):
node = self._get_node(obj)
attrs = ua.ObjectAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.EventNotifier = obj.eventnotifier
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
[docs] def add_object_type(self, obj):
node = self._get_node(obj)
attrs = ua.ObjectTypeAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.IsAbstract = obj.abstract
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
[docs] def add_variable(self, obj):
node = self._get_node(obj)
attrs = ua.VariableAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.DataType = self.to_nodeid(obj.datatype)
if obj.value is not None:
attrs.Value = self._add_variable_value(obj,)
if obj.rank:
attrs.ValueRank = obj.rank
if obj.accesslevel:
attrs.AccessLevel = obj.accesslevel
if obj.useraccesslevel:
attrs.UserAccessLevel = obj.useraccesslevel
if obj.minsample:
attrs.MinimumSamplingInterval = obj.minsample
if obj.dimensions:
attrs.ArrayDimensions = obj.dimensions
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def _get_ext_class(self, name):
if hasattr(ua, name):
return getattr(ua, name)
elif name in self.aliases.keys():
nodeid = self.aliases[name]
class_type = ua.uatypes.get_extensionobject_class_type(nodeid)
if class_type:
return class_type
else:
raise Exception("Error no extension class registered ", name, nodeid)
else:
raise Exception("Error no alias found for extension class", name)
def _make_ext_obj(self, obj):
ext = self._get_ext_class(obj.objname)()
for name, val in obj.body:
if not isinstance(val, list):
raise Exception("Error val should be a list, this is a python-opcua bug", name, type(val), val)
else:
for attname, v in val:
self._set_attr(ext, attname, v)
return ext
def _get_val_type(self, obj, attname):
for name, uatype in obj.ua_types:
if name == attname:
return uatype
raise UaError("Attribute '{}' defined in xml is not found in object '{}'".format(attname, ext))
def _set_attr(self, obj, attname, val):
# tow possible values:
# either we get value directly
# or a dict if it s an object or a list
if isinstance(val, (str, unicode)):
pval = xmlparser.ua_type_to_python(val, self._get_val_type(obj, attname))
setattr(obj, attname, pval)
else:
# so we have either an object or a list...
obj2 = getattr(obj, attname)
if isinstance(obj2, ua.NodeId): # NodeId representation does not follow common rules!!
for attname2, v2 in val:
if attname2 == "Identifier":
if hasattr(ua.ObjectIds, v2):
obj2 = ua.NodeId(getattr(ua.ObjectIds, v2))
else:
obj2 = ua.NodeId.from_string(v2)
setattr(obj, attname, self._migrate_ns(obj2))
break
elif not hasattr(obj2, "ua_types"):
# we probably have a list
my_list = []
for vtype, v2 in val:
my_list.append(xmlparser.ua_type_to_python(v2, vtype))
setattr(obj, attname, my_list)
else:
for attname2, v2 in val:
self._set_attr(obj2, attname2, v2)
setattr(obj, attname, obj2)
def _add_variable_value(self, obj):
"""
Returns the value for a Variable based on the objects value type.
"""
self.logger.debug("Setting value with type %s and value %s", obj.valuetype, obj.value)
if obj.valuetype == 'ListOfExtensionObject':
values = []
for ext in obj.value:
extobj = self._make_ext_obj(ext)
values.append(extobj)
return ua.Variant(values, ua.VariantType.ExtensionObject)
elif obj.valuetype == 'ListOfGuid':
return ua.Variant([
uuid.UUID(guid) for guid in obj.value
], getattr(ua.VariantType, obj.valuetype[6:]))
elif obj.valuetype.startswith("ListOf"):
vtype = obj.valuetype[6:]
if hasattr(ua.ua_binary.Primitives, vtype):
return ua.Variant(obj.value, getattr(ua.VariantType, vtype))
else:
return ua.Variant([getattr(ua, vtype)(v) for v in obj.value])
elif obj.valuetype == 'ExtensionObject':
extobj = self._make_ext_obj(obj.value)
return ua.Variant(extobj, getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'Guid':
return ua.Variant(uuid.UUID(obj.value), getattr(ua.VariantType, obj.valuetype))
elif obj.valuetype == 'LocalizedText':
ltext = ua.LocalizedText()
for name, val in obj.value:
if name == "Text":
ltext.Text = val
else:
self.logger.warning("While parsing localizedText value, unkown element: %s with val: %s", name, val)
return ua.Variant(ltext, ua.VariantType.LocalizedText)
elif obj.valuetype == 'NodeId':
return ua.Variant(ua.NodeId.from_string(obj.value))
else:
return ua.Variant(obj.value, getattr(ua.VariantType, obj.valuetype))
[docs] def add_variable_type(self, obj):
node = self._get_node(obj)
attrs = ua.VariableTypeAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
attrs.DataType = self.to_nodeid(obj.datatype)
if obj.value and len(obj.value) == 1:
attrs.Value = obj.value[0]
if obj.rank:
attrs.ValueRank = obj.rank
if obj.abstract:
attrs.IsAbstract = obj.abstract
if obj.dimensions:
attrs.ArrayDimensions = obj.dimensions
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
[docs] def add_method(self, obj):
node = self._get_node(obj)
attrs = ua.MethodAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
if obj.accesslevel:
attrs.AccessLevel = obj.accesslevel
if obj.useraccesslevel:
attrs.UserAccessLevel = obj.useraccesslevel
if obj.minsample:
attrs.MinimumSamplingInterval = obj.minsample
if obj.dimensions:
attrs.ArrayDimensions = obj.dimensions
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
[docs] def add_reference_type(self, obj):
node = self._get_node(obj)
attrs = ua.ReferenceTypeAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
if obj.inversename:
attrs.InverseName = ua.LocalizedText(obj.inversename)
if obj.abstract:
attrs.IsAbstract = obj.abstract
if obj.symmetric:
attrs.Symmetric = obj.symmetric
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
[docs] def add_datatype(self, obj):
node = self._get_node(obj)
attrs = ua.DataTypeAttributes()
if obj.desc:
attrs.Description = ua.LocalizedText(obj.desc)
attrs.DisplayName = ua.LocalizedText(obj.displayname)
if obj.abstract:
attrs.IsAbstract = obj.abstract
node.NodeAttributes = attrs
res = self._add_node(node)
self._add_refs(obj)
res[0].StatusCode.check()
return res[0].AddedNodeId
def _add_refs(self, obj):
if not obj.refs:
return
refs = []
for data in obj.refs:
ref = ua.AddReferencesItem()
ref.IsForward = data.forward
ref.ReferenceTypeId = self.to_nodeid(data.reftype)
ref.SourceNodeId = self._migrate_ns(obj.nodeid)
ref.TargetNodeId = self.to_nodeid(data.target)
refs.append(ref)
self._add_references(refs)
def _sort_nodes_by_parentid(self, ndatas):
"""
Sort the list of nodes according their parent node in order to respect
the dependency between nodes.
:param nodes: list of NodeDataObjects
:returns: list of sorted nodes
"""
_ndatas = list(ndatas)
# list of node ids that are already sorted / inserted
sorted_nodes_ids = []
# list of sorted nodes (i.e. XML Elements)
sorted_ndatas = []
all_node_ids = [data.nodeid for data in ndatas]
# list of namespace indexes that are relevant for this import
# we can only respect ordering nodes for namespaces indexes that
# are defined in the xml file itself. Thus we assume that all other
# references namespaces are already known to the server and should
# not create any dependency problems (like "NodeNotFound")
while len(_ndatas) > 0:
pop_nodes = []
for ndata in _ndatas:
# Insert nodes that
# (1) have no parent / parent_ns is None (e.g. namespace 0)
# (2) ns is not in list of relevant namespaces
if ndata.nodeid.NamespaceIndex not in self.namespaces or \
ndata.parent is None or \
ndata.parent not in all_node_ids:
sorted_ndatas.append(ndata)
sorted_nodes_ids.append(ndata.nodeid)
pop_nodes.append(ndata)
else:
# Check if the nodes parent is already in the list of
# inserted nodes
if ndata.parent in sorted_nodes_ids:
sorted_ndatas.append(ndata)
sorted_nodes_ids.append(ndata.nodeid)
pop_nodes.append(ndata)
# Remove inserted nodes from the list
for ndata in pop_nodes:
_ndatas.pop(_ndatas.index(ndata))
return sorted_ndatas