import struct
from opcua.ua import uaprotocol_auto as auto
from opcua.ua import uatypes
from opcua.common import utils
from opcua.ua.uatypes import AccessLevel, FrozenClass
OPC_TCP_SCHEME = 'opc.tcp'
[docs]class Hello(uatypes.FrozenClass):
ua_types = (('ProtocolVersion', 'UInt32'), ('ReceiveBufferSize', 'UInt32'), ('SendBufferSize', 'UInt32'),
('MaxMessageSize', 'UInt32'), ('MaxChunkCount', 'UInt32'), ('EndpointUrl', 'String'), )
def __init__(self):
self.ProtocolVersion = 0
self.ReceiveBufferSize = 2**31 - 1
self.SendBufferSize = 2**31 - 1
self.MaxMessageSize = 0 # No limits
self.MaxChunkCount = 0 # No limits
self.EndpointUrl = ""
self._freeze = True
[docs]class MessageType(object):
Invalid = b"INV" # FIXME: check value
Hello = b"HEL"
Acknowledge = b"ACK"
Error = b"ERR"
SecureOpen = b"OPN"
SecureClose = b"CLO"
SecureMessage = b"MSG"
[docs]class ChunkType(object):
Invalid = b"0" # FIXME check
Single = b"F"
Intermediate = b"C"
Abort = b"A" # when an error occurred and the Message is aborted (body is ErrorMessage)
[docs]class ErrorMessage(uatypes.FrozenClass):
ua_types = (('Error', 'StatusCode'), ('Reason', 'String'), )
def __init__(self):
self.Error = uatypes.StatusCode()
self.Reason = ""
self._freeze = True
def __str__(self):
return "MessageAbort(error:{0}, reason:{1})".format(self.Error, self.Reason)
__repr__ = __str__
[docs]class Acknowledge(uatypes.FrozenClass):
ua_types = [
("ProtocolVersion", "UInt32"),
("ReceiveBufferSize", "UInt32"),
("SendBufferSize", "UInt32"),
("MaxMessageSize", "UInt32"),
("MaxChunkCount", "UInt32"),
]
def __init__(self):
self.ProtocolVersion = 0
self.ReceiveBufferSize = 65536
self.SendBufferSize = 65536
self.MaxMessageSize = 0 # No limits
self.MaxChunkCount = 0 # No limits
self._freeze = True
[docs]class CryptographyNone:
"""
Base class for symmetric/asymmetric cryprography
"""
def __init__(self):
pass
[docs] def plain_block_size(self):
"""
Size of plain text block for block cipher.
"""
return 1
[docs] def encrypted_block_size(self):
"""
Size of encrypted text block for block cipher.
"""
return 1
[docs] def padding(self, size):
"""
Create padding for a block of given size.
plain_size = size + len(padding) + signature_size()
plain_size = N * plain_block_size()
"""
return b''
[docs] def min_padding_size(self):
return 0
[docs] def signature_size(self):
return 0
[docs] def signature(self, data):
return b''
[docs] def encrypt(self, data):
return data
[docs] def decrypt(self, data):
return data
[docs] def vsignature_size(self):
return 0
[docs] def verify(self, data, signature):
"""
Verify signature and raise exception if signature is invalid
"""
pass
[docs] def remove_padding(self, data):
return data
[docs]class SecurityPolicy(object):
"""
Base class for security policy
"""
URI = "http://opcfoundation.org/UA/SecurityPolicy#None"
AsymmetricSignatureURI = ""
signature_key_size = 0
symmetric_key_size = 0
def __init__(self):
self.asymmetric_cryptography = CryptographyNone()
self.symmetric_cryptography = CryptographyNone()
self.Mode = auto.MessageSecurityMode.None_
self.server_certificate = None
self.client_certificate = None
[docs] def make_local_symmetric_key(self, secret, seed):
pass
[docs] def make_remote_symmetric_key(self, secret, seed):
pass
[docs]class SecurityPolicyFactory(object):
"""
Helper class for creating server-side SecurityPolicy.
Server has one certificate and private key, but needs a separate
SecurityPolicy for every client and client's certificate
"""
def __init__(self, cls=SecurityPolicy, mode=auto.MessageSecurityMode.None_, certificate=None, private_key=None):
self.cls = cls
self.mode = mode
self.certificate = certificate
self.private_key = private_key
[docs] def matches(self, uri, mode=None):
return self.cls.URI == uri and (mode is None or self.mode == mode)
[docs] def create(self, peer_certificate):
if self.cls is SecurityPolicy:
return self.cls()
else:
return self.cls(peer_certificate, self.certificate, self.private_key, self.mode)
[docs]class Message(object):
def __init__(self, chunks):
self._chunks = chunks
def __str__(self):
return 'Message(' + str(self._chunks) + ')'
__repr__ = __str__
[docs] def message_type(self):
return self._chunks[0].MessageHeader.MessageType
[docs] def request_id(self):
return self._chunks[0].SequenceHeader.RequestId
[docs] def body(self):
body = b"".join([c.Body for c in self._chunks])
return utils.Buffer(body)
# FIXES for missing switchfield in NodeAttributes classes
ana = auto.NodeAttributesMask
[docs]class ObjectAttributes(auto.ObjectAttributes):
def __init__(self):
auto.ObjectAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.EventNotifier
[docs]class ObjectTypeAttributes(auto.ObjectTypeAttributes):
def __init__(self):
auto.ObjectTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
[docs]class VariableAttributes(auto.VariableAttributes):
def __init__(self):
auto.VariableAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.AccessLevel | ana.UserAccessLevel | ana.MinimumSamplingInterval | ana.Historizing
self.Historizing = False
self.AccessLevel = AccessLevel.CurrentRead.mask
self.UserAccessLevel = AccessLevel.CurrentRead.mask
[docs]class VariableTypeAttributes(auto.VariableTypeAttributes):
def __init__(self):
auto.VariableTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Value | ana.DataType | ana.ValueRank | ana.ArrayDimensions | ana.IsAbstract
[docs]class MethodAttributes(auto.MethodAttributes):
def __init__(self):
auto.MethodAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.Executable | ana.UserExecutable
[docs]class ReferenceTypeAttributes(auto.ReferenceTypeAttributes):
def __init__(self):
auto.ReferenceTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract | ana.Symmetric | ana.InverseName
[docs]class DataTypeAttributes(auto.DataTypeAttributes):
def __init__(self):
auto.DataTypeAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.IsAbstract
[docs]class ViewAttributes(auto.ViewAttributes):
def __init__(self):
auto.ViewAttributes.__init__(self)
self.SpecifiedAttributes = ana.DisplayName | ana.Description | ana.WriteMask | ana.UserWriteMask | ana.ContainsNoLoops | ana.EventNotifier
[docs]class Argument(auto.Argument):
def __init__(self):
auto.Argument.__init__(self)
self.ValueRank = -2
[docs]class XmlElement(FrozenClass):
'''
An XML element encoded as a UTF-8 string.
:ivar Value:
:vartype Value: String
'''
ua_types = [
('Value', 'String'),
]
def __init__(self, xml=""):
self.Value = xml
self._freeze = True
def __str__(self):
return 'XmlElement(' + 'Value:' + str(self.Value) + ')'
__repr__ = __str__
def __eq__(self, el):
return isinstance(el, XmlElement) and self.Value == el.Value