Source code for opcua.common.manage_nodes

"""
High level functions to create nodes
"""
from opcua import ua
from opcua.common import node
from opcua.common.instantiate import instantiate


def _parse_nodeid_qname(*args):
    try:
        if isinstance(args[0], int):
            nodeid = ua.NodeId(0, int(args[0]))
            qname = ua.QualifiedName(args[1], int(args[0]))
            return nodeid, qname
        if isinstance(args[0], ua.NodeId):
            nodeid = args[0]
        elif isinstance(args[0], str):
            nodeid = ua.NodeId.from_string(args[0])
        else:
            raise RuntimeError()
        if isinstance(args[1], ua.QualifiedName):
            qname = args[1]
        elif isinstance(args[1], str):
            qname = ua.QualifiedName.from_string(args[1])
        else:
            raise RuntimeError()
        return nodeid, qname
    except ua.UaError:
        raise
    except Exception as ex:
        raise TypeError("This method takes either a namespace index and a string as argument or a nodeid and a qualifiedname. Received arguments {0} and got exception {1}".format(args, ex))


[docs]def create_folder(parent, nodeid, bname): """ create a child node folder arguments are nodeid, browsename or namespace index, name """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.FolderType))
[docs]def create_object(parent, nodeid, bname, objecttype=None): """ create a child node object arguments are nodeid, browsename, [objecttype] or namespace index, name, [objecttype] if objectype is given (a NodeId) then the type node is instantiated inclusive its child nodes """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) if objecttype is not None: objecttype = node.Node(parent.server, objecttype) dname = ua.LocalizedText(qname.Name) nodes = instantiate(parent, objecttype, nodeid, bname=qname, dname=dname)[0] return nodes else: return node.Node(parent.server, _create_object(parent.server, parent.nodeid, nodeid, qname, ua.ObjectIds.BaseObjectType))
[docs]def create_property(parent, nodeid, bname, val, varianttype=None, datatype=None): """ create a child node property args are nodeid, browsename, value, [variant type] or idx, name, value, [variant type] """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) var = ua.Variant(val, varianttype) if datatype and isinstance(datatype, int): datatype = ua.NodeId(datatype, 0) if datatype and not isinstance(datatype, ua.NodeId): raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid") return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=True))
[docs]def create_variable(parent, nodeid, bname, val, varianttype=None, datatype=None): """ create a child node variable args are nodeid, browsename, value, [variant type], [data type] or idx, name, value, [variant type], [data type] """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) var = ua.Variant(val, varianttype) if datatype and isinstance(datatype, int): datatype = ua.NodeId(datatype, 0) if datatype and not isinstance(datatype, ua.NodeId): raise RuntimeError("datatype argument must be a nodeid or an int refering to a nodeid") return node.Node(parent.server, _create_variable(parent.server, parent.nodeid, nodeid, qname, var, datatype=datatype, isproperty=False))
[docs]def create_variable_type(parent, nodeid, bname, datatype): """ Create a new variable type args are nodeid, browsename and datatype or idx, name and data type """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) if datatype and isinstance(datatype, int): datatype = ua.NodeId(datatype, 0) if datatype and not isinstance(datatype, ua.NodeId): raise RuntimeError("Data type argument must be a nodeid or an int refering to a nodeid, received: {}".format(datatype)) return node.Node(parent.server, _create_variable_type(parent.server, parent.nodeid, nodeid, qname, datatype))
[docs]def create_reference_type(parent, nodeid, bname, symmetric=True, inversename=None): """ Create a new reference type args are nodeid and browsename or idx and name """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) return node.Node(parent.server, _create_reference_type(parent.server, parent.nodeid, nodeid, qname, symmetric, inversename))
[docs]def create_object_type(parent, nodeid, bname): """ Create a new object type to be instanciated in address space. arguments are nodeid, browsename or namespace index, name """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) return node.Node(parent.server, _create_object_type(parent.server, parent.nodeid, nodeid, qname))
[docs]def create_method(parent, *args): """ create a child method object This is only possible on server side!! args are nodeid, browsename, method_to_be_called, [input argument types], [output argument types] or idx, name, method_to_be_called, [input argument types], [output argument types] if argument types is specified, child nodes advertising what arguments the method uses and returns will be created a callback is a method accepting the nodeid of the parent as first argument and variants after. returns a list of variants """ nodeid, qname = _parse_nodeid_qname(*args[:2]) callback = args[2] if len(args) > 3: inputs = args[3] else: inputs = [] if len(args) > 4: outputs = args[4] else: outputs = [] return node.Node(parent.server, _create_method(parent, nodeid, qname, callback, inputs, outputs))
def _create_object(server, parentnodeid, nodeid, qname, objecttype): addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.ParentNodeId = parentnodeid if node.Node(server, parentnodeid).get_type_definition() == ua.NodeId(ua.ObjectIds.FolderType): addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.Organizes) else: addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent) addnode.NodeClass = ua.NodeClass.Object if isinstance(objecttype, int): addnode.TypeDefinition = ua.NodeId(objecttype) else: addnode.TypeDefinition = objecttype attrs = ua.ObjectAttributes() attrs.EventNotifier = 0 attrs.Description = ua.LocalizedText(qname.Name) attrs.DisplayName = ua.LocalizedText(qname.Name) attrs.WriteMask = 0 attrs.UserWriteMask = 0 addnode.NodeAttributes = attrs results = server.add_nodes([addnode]) results[0].StatusCode.check() return results[0].AddedNodeId def _create_reference_type(server, parentnodeid, nodeid, qname, symmetric, inversename): addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.NodeClass = ua.NodeClass.ReferenceType addnode.ParentNodeId = parentnodeid addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype) attrs = ua.ReferenceTypeAttributes() attrs.IsAbstract = False attrs.Description = ua.LocalizedText(qname.Name) attrs.DisplayName = ua.LocalizedText(qname.Name) attrs.Symmetric = symmetric attrs.InverseName = ua.LocalizedText(inversename) attrs.UserWriteMask = 0 addnode.NodeAttributes = attrs results = server.add_nodes([addnode]) results[0].StatusCode.check() return results[0].AddedNodeId def _create_object_type(server, parentnodeid, nodeid, qname): addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.ParentNodeId = parentnodeid addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype) addnode.NodeClass = ua.NodeClass.ObjectType attrs = ua.ObjectTypeAttributes() attrs.IsAbstract = False attrs.Description = ua.LocalizedText(qname.Name) attrs.DisplayName = ua.LocalizedText(qname.Name) attrs.WriteMask = 0 attrs.UserWriteMask = 0 addnode.NodeAttributes = attrs results = server.add_nodes([addnode]) results[0].StatusCode.check() return results[0].AddedNodeId def _create_variable(server, parentnodeid, nodeid, qname, var, datatype=None, isproperty=False): addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.NodeClass = ua.NodeClass.Variable addnode.ParentNodeId = parentnodeid if isproperty: addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasProperty) addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.PropertyType) else: addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent) addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) attrs = ua.VariableAttributes() attrs.Description = ua.LocalizedText(qname.Name) attrs.DisplayName = ua.LocalizedText(qname.Name) if datatype: attrs.DataType = datatype else: attrs.DataType = _guess_datatype(var) attrs.Value = var if not isinstance(var.Value, (list, tuple)): attrs.ValueRank = ua.ValueRank.Scalar else: if var.Dimensions: attrs.ValueRank = len(var.Dimensions) attrs.ArrayDimensions = var.Dimensions attrs.WriteMask = 0 attrs.UserWriteMask = 0 attrs.Historizing = False attrs.AccessLevel = ua.AccessLevel.CurrentRead.mask attrs.UserAccessLevel = ua.AccessLevel.CurrentRead.mask addnode.NodeAttributes = attrs results = server.add_nodes([addnode]) results[0].StatusCode.check() return results[0].AddedNodeId def _create_variable_type(server, parentnodeid, nodeid, qname, datatype, value=None): addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.NodeClass = ua.NodeClass.VariableType addnode.ParentNodeId = parentnodeid addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype) #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) attrs = ua.VariableTypeAttributes() attrs.Description = ua.LocalizedText(qname.Name) attrs.DisplayName = ua.LocalizedText(qname.Name) attrs.DataType = datatype attrs.IsAbstract = False if value: attrs.Value = value if isinstance(value, (list, tuple)): attrs.ValueRank = ua.ValueRank.OneDimension else: attrs.ValueRank = ua.ValueRank.Scalar #attrs.ArrayDimensions = None attrs.WriteMask = 0 attrs.UserWriteMask = 0 addnode.NodeAttributes = attrs results = server.add_nodes([addnode]) results[0].StatusCode.check() return results[0].AddedNodeId
[docs]def create_data_type(parent, nodeid, bname, description=None): """ Create a new data type to be used in new variables, etc .. arguments are nodeid, browsename or namespace index, name """ nodeid, qname = _parse_nodeid_qname(nodeid, bname) addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.NodeClass = ua.NodeClass.DataType addnode.ParentNodeId = parent.nodeid addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasSubtype) #addnode.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseDataVariableType) # No type definition for types attrs = ua.DataTypeAttributes() if description is None: attrs.Description = ua.LocalizedText(qname.Name) else: attrs.Description = ua.LocalizedText(description) attrs.DisplayName = ua.LocalizedText(qname.Name) attrs.WriteMask = 0 attrs.UserWriteMask = 0 attrs.IsAbstract = False # True mean they cannot be instanciated addnode.NodeAttributes = attrs results = parent.server.add_nodes([addnode]) results[0].StatusCode.check() return node.Node(parent.server, results[0].AddedNodeId)
def _create_method(parent, nodeid, qname, callback, inputs, outputs): addnode = ua.AddNodesItem() addnode.RequestedNewNodeId = nodeid addnode.BrowseName = qname addnode.NodeClass = ua.NodeClass.Method addnode.ParentNodeId = parent.nodeid addnode.ReferenceTypeId = ua.NodeId(ua.ObjectIds.HasComponent) #node.TypeDefinition = ua.NodeId(ua.ObjectIds.BaseObjectType) attrs = ua.MethodAttributes() attrs.Description = ua.LocalizedText(qname.Name) attrs.DisplayName = ua.LocalizedText(qname.Name) attrs.WriteMask = 0 attrs.UserWriteMask = 0 attrs.Executable = True attrs.UserExecutable = True addnode.NodeAttributes = attrs results = parent.server.add_nodes([addnode]) results[0].StatusCode.check() method = node.Node(parent.server, results[0].AddedNodeId) if inputs: create_property(method, ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex), ua.QualifiedName("InputArguments", 0), [_vtype_to_argument(vtype) for vtype in inputs], varianttype=ua.VariantType.ExtensionObject, datatype=ua.ObjectIds.Argument) if outputs: create_property(method, ua.NodeId(namespaceidx=method.nodeid.NamespaceIndex), ua.QualifiedName("OutputArguments", 0), [_vtype_to_argument(vtype) for vtype in outputs], varianttype=ua.VariantType.ExtensionObject, datatype=ua.ObjectIds.Argument) if hasattr(parent.server, "add_method_callback"): parent.server.add_method_callback(method.nodeid, callback) return results[0].AddedNodeId def _vtype_to_argument(vtype): if isinstance(vtype, ua.Argument): return vtype arg = ua.Argument() if isinstance(vtype, ua.VariantType): arg.DataType = ua.NodeId(vtype.value) else: arg.DataType = ua.NodeId(vtype) return arg def _guess_datatype(variant): if variant.VariantType == ua.VariantType.ExtensionObject: if variant.Value is None: raise ua.UaError("Cannot guess DataType from Null ExtensionObject") if type(variant.Value) in (list, tuple): if len(variant.Value) == 0: raise ua.UaError("Cannot guess DataType from Null ExtensionObject") extobj = variant.Value[0] else: extobj = variant.Value classname = extobj.__class__.__name__ return ua.NodeId(getattr(ua.ObjectIds, classname)) else: return ua.NodeId(getattr(ua.ObjectIds, variant.VariantType.name))
[docs]def delete_nodes(server, nodes, recursive=False, delete_target_references=True): """ Delete specified nodes. Optionally delete recursively all nodes with a downward hierachic references to the node return the list of deleted node and the result """ nodestodelete = [] if recursive: nodes = _add_childs(nodes) for mynode in nodes: it = ua.DeleteNodesItem() it.NodeId = mynode.nodeid it.DeleteTargetReferences = delete_target_references nodestodelete.append(it) params = ua.DeleteNodesParameters() params.NodesToDelete = nodestodelete return nodes, server.delete_nodes(params)
def _add_childs(nodes): results = [] for mynode in nodes: results += _add_childs(mynode.get_children()) results += [mynode] return results