"""
parse xml file from opcua-spec
"""
import logging
from pytz import utc
import uuid
import re
import sys
import base64
import xml.etree.ElementTree as ET
from opcua.common import ua_utils
from opcua import ua
[docs]def ua_type_to_python(val, uatype_as_str):
"""
Converts a string value to a python value according to ua_utils.
"""
return ua_utils.string_to_val(val, getattr(ua.VariantType, uatype_as_str))
def _to_bool(val):
"""
Easy access to boolean conversion.
"""
return ua_type_to_python(val, "Boolean")
[docs]class NodeData(object):
def __init__(self):
self.nodetype = None
self.nodeid = None
self.browsename = None
self.displayname = None
self.symname = None # FIXME: this param is never used, why?
self.parent = None
self.parentlink = None
self.desc = ""
self.typedef = None
self.refs = []
self.nodeclass = None
self.eventnotifier = 0
# variable
self.datatype = None
self.rank = -1 # check default value
self.value = None
self.valuetype = None
self.dimensions = None
self.accesslevel = None
self.useraccesslevel = None
self.minsample = None
# referencetype
self.inversename = ""
self.abstract = False
self.symmetric = False
# datatype
self.definition = []
def __str__(self):
return "NodeData(nodeid:{0})".format(self.nodeid)
__repr__ = __str__
[docs]class RefStruct(object):
def __init__(self):
self.reftype = None
self.forward = True
self.target = None
[docs]class ExtObj(object):
def __init__(self):
self.typeid = None
self.objname = None
self.bodytype = None
self.body = {}
def __str__(self):
return "ExtObj({0}, {1})".format(self.objname, self.body)
__repr__ = __str__
[docs]class XMLParser(object):
def __init__(self, xmlpath=None, xmlstring=None):
self.logger = logging.getLogger(__name__)
self._retag = re.compile(r"(\{.*\})(.*)")
self.path = xmlpath
if xmlstring:
self.root = ET.fromstring(xmlstring)
else:
self.root = ET.parse(xmlpath).getroot()
# FIXME: hard to get these xml namespaces with ElementTree, we may have to shift to lxml
self.ns = {
'base': "http://opcfoundation.org/UA/2011/03/UANodeSet.xsd",
'uax': "http://opcfoundation.org/UA/2008/02/Types.xsd",
'xsd': "http://www.w3.org/2001/XMLSchema",
'xsi': "http://www.w3.org/2001/XMLSchema-instance"
}
[docs] def get_used_namespaces(self):
"""
Return the used namespace uris in this import file
"""
namespaces_uris = []
for child in self.root:
tag = self._retag.match(child.tag).groups()[1]
if tag == 'NamespaceUris':
namespaces_uris = [ns_element.text for ns_element in child]
break
return namespaces_uris
[docs] def get_aliases(self):
"""
Return the used node aliases in this import file
"""
aliases = {}
for child in self.root:
tag = self._retag.match(child.tag).groups()[1]
if tag == 'Aliases':
for el in child:
aliases[el.attrib["Alias"]] = el.text
break
return aliases
[docs] def get_node_datas(self):
nodes = []
for child in self.root:
tag = self._retag.match(child.tag).groups()[1]
if tag not in ["Aliases", "NamespaceUris", "Extensions", "Models"]: # these XML tags don't contain nodes
node = self._parse_node(tag, child)
nodes.append(node)
return nodes
def _parse_node(self, nodetype, child):
"""
Parse a XML node and create a NodeData object.
"""
obj = NodeData()
obj.nodetype = nodetype
for key, val in child.attrib.items():
self._set_attr(key, val, obj)
self.logger.info("Parsing node: %s %s", obj.nodeid, obj.browsename)
obj.displayname = obj.browsename # give a default value to display name
for el in child:
self._parse_attr(el, obj)
return obj
def _set_attr(self, key, val, obj):
if key == "NodeId":
obj.nodeid = val
elif key == "BrowseName":
obj.browsename = val
elif key == "SymbolicName":
obj.symname = val
elif key == "ParentNodeId":
obj.parent = val
elif key == "DataType":
obj.datatype = val
elif key == "IsAbstract":
obj.abstract = _to_bool(val)
elif key == "Executable":
obj.executable = _to_bool(val)
elif key == "EventNotifier":
obj.eventnotifier = int(val)
elif key == "ValueRank":
obj.rank = int(val)
elif key == "ArrayDimensions":
obj.dimensions = [int(i) for i in val.split(",")]
elif key == "MinimumSamplingInterval":
obj.minsample = int(val)
elif key == "AccessLevel":
obj.accesslevel = int(val)
elif key == "UserAccessLevel":
obj.useraccesslevel = int(val)
elif key == "Symmetric":
obj.symmetric = _to_bool(val)
else:
self.logger.info("Attribute not implemented: %s:%s", key, val)
def _parse_attr(self, el, obj):
tag = self._retag.match(el.tag).groups()[1]
if tag == "DisplayName":
obj.displayname = el.text
elif tag == "Description":
obj.desc = el.text
elif tag == "References":
self._parse_refs(el, obj)
elif tag == "Value":
self._parse_contained_value(el, obj)
elif tag == "InverseName":
obj.inversename = el.text
elif tag == "Definition":
for field in el:
obj.definition.append(field)
else:
self.logger.info("Not implemented tag: %s", el)
def _parse_contained_value(self, el, obj):
"""
Parse the child of el as a constant.
"""
val_el = el.find(".//") # should be only one child
self._parse_value(val_el, obj)
def _parse_value(self, val_el, obj):
"""
Parse the node val_el as a constant.
"""
if val_el is not None and val_el.text is not None:
ntag = self._retag.match(val_el.tag).groups()[1]
else:
ntag = "Null"
obj.valuetype = ntag
if ntag == "Null":
obj.value = None
elif hasattr(ua.ua_binary.Primitives1, ntag):
# Elementary types have their parsing directly relying on ua_type_to_python.
obj.value = ua_type_to_python(val_el.text, ntag)
elif ntag == "DateTime":
obj.value = ua_type_to_python(val_el.text, ntag)
# According to specs, DateTime should be either UTC or with a timezone.
if obj.value.tzinfo is None or obj.value.tzinfo.utcoffset(obj.value) is None:
utc.localize(obj.value) # FIXME Forcing to UTC if unaware, maybe should raise?
elif ntag == "ByteString":
if val_el.text is None:
mytext = b""
else:
mytext = val_el.text.encode()
mytext = base64.b64decode(mytext)
obj.value = mytext
elif ntag == "String":
mytext = val_el.text
if mytext is None:
# Support importing null strings.
mytext = ""
#mytext = mytext.replace('\n', '').replace('\r', '')
obj.value = mytext
elif ntag == "Guid":
self._parse_contained_value(val_el, obj)
# Override parsed string type to guid.
obj.valuetype = ntag
elif ntag == "NodeId":
id_el = val_el.find("uax:Identifier", self.ns)
if id_el is not None:
obj.value = id_el.text
elif ntag == "ExtensionObject":
obj.value = self._parse_ext_obj(val_el)
elif ntag == "LocalizedText":
obj.value = self._parse_body(val_el)
elif ntag == "ListOfLocalizedText":
obj.value = self._parse_list_of_localized_text(val_el)
elif ntag == "ListOfExtensionObject":
obj.value = self._parse_list_of_extension_object(val_el)
elif ntag.startswith("ListOf"):
# Default case for "ListOf" types.
# Should stay after particular cases (e.g.: "ListOfLocalizedText").
obj.value = []
for val_el in val_el:
tmp = NodeData()
self._parse_value(val_el, tmp)
obj.value.append(tmp.value)
else:
# Missing according to string_to_val: XmlElement, ExpandedNodeId,
# QualifiedName, StatusCode.
# Missing according to ua.VariantType (also missing in string_to_val):
# DataValue, Variant, DiagnosticInfo.
self.logger.warning("Parsing value of type '%s' not implemented", ntag)
def _get_text(self, el):
txtlist = [txt.strip() for txt in el.itertext()]
return "".join(txtlist)
def _parse_list_of_localized_text(self, el):
value = []
for localized_text in el:
mylist = self._parse_body(localized_text)
# small hack since we did not handle LocalizedText as ExtensionObject at begynning
for name, val in mylist:
if name == "Text":
value.append(val)
return value
def _parse_list_of_extension_object(self, el):
"""
Parse a uax:ListOfExtensionObject Value
Return an list of ExtObj
"""
value = []
for extension_object in el:
ext_obj = self._parse_ext_obj(extension_object)
value.append(ext_obj)
return value
def _parse_ext_obj(self, el):
ext = ExtObj()
for extension_object_part in el:
ntag = self._retag.match(extension_object_part.tag).groups()[1]
if ntag == 'TypeId':
ntag = self._retag.match(extension_object_part.find('*').tag).groups()[1]
ext.typeid = self._get_text(extension_object_part)
elif ntag == 'Body':
ext.objname = self._retag.match(extension_object_part.find('*').tag).groups()[1]
ext.body = self._parse_body(extension_object_part)
else:
self.logger.warning("Unknown ntag", ntag)
return ext
def _parse_body(self, el):
body = []
for body_item in el:
otag = self._retag.match(body_item.tag).groups()[1]
childs = [i for i in body_item]
if not childs:
val = self._get_text(body_item)
else:
val = self._parse_body(body_item)
if val:
body.append((otag, val))
return body
def _parse_refs(self, el, obj):
parent, parentlink = obj.parent, None
for ref in el:
struct = RefStruct()
struct.forward = "IsForward" not in ref.attrib or ref.attrib["IsForward"] not in ("false", "False")
struct.target = ref.text
struct.reftype = ref.attrib["ReferenceType"]
obj.refs.append(struct)
if ref.attrib["ReferenceType"] == "HasTypeDefinition":
obj.typedef = ref.text
elif not struct.forward:
parent, parentlink = struct.target, struct.reftype
if obj.parent == parent:
obj.parentlink = parentlink
if not obj.parent or not obj.parentlink:
obj.parent, obj.parentlink = parent, parentlink
self.logger.info("Could not detect backward reference to parent for node '%s'", obj.nodeid)