Source code for opcua.crypto.security_policies

import logging
import struct

from abc import ABCMeta, abstractmethod
from opcua.ua import CryptographyNone, SecurityPolicy
from opcua.ua import MessageSecurityMode
from opcua.ua import UaError
try:
    from opcua.crypto import uacrypto
    CRYPTOGRAPHY_AVAILABLE = True
except ImportError:
    CRYPTOGRAPHY_AVAILABLE = False


POLICY_NONE_URI = 'http://opcfoundation.org/UA/SecurityPolicy#None'


[docs]def require_cryptography(obj): """ Raise exception if cryptography module is not available. Call this function in constructors. """ if not CRYPTOGRAPHY_AVAILABLE: raise UaError("Can't use {0}, cryptography module is not installed".format(obj.__class__.__name__))
[docs]class Signer(object): """ Abstract base class for cryptographic signature algorithm """ __metaclass__ = ABCMeta
[docs] @abstractmethod def signature_size(self): pass
[docs] @abstractmethod def signature(self, data): pass
[docs]class Verifier(object): """ Abstract base class for cryptographic signature verification """ __metaclass__ = ABCMeta
[docs] @abstractmethod def signature_size(self): pass
[docs] @abstractmethod def verify(self, data, signature): pass
[docs]class Encryptor(object): """ Abstract base class for encryption algorithm """ __metaclass__ = ABCMeta
[docs] @abstractmethod def plain_block_size(self): pass
[docs] @abstractmethod def encrypted_block_size(self): pass
[docs] @abstractmethod def encrypt(self, data): pass
[docs]class Decryptor(object): """ Abstract base class for decryption algorithm """ __metaclass__ = ABCMeta
[docs] @abstractmethod def plain_block_size(self): pass
[docs] @abstractmethod def encrypted_block_size(self): pass
[docs] @abstractmethod def decrypt(self, data): pass
[docs]class Cryptography(CryptographyNone): """ Security policy: Sign or SignAndEncrypt """ def __init__(self, mode=MessageSecurityMode.Sign): self.Signer = None self.Verifier = None self.Encryptor = None self.Decryptor = None assert mode in (MessageSecurityMode.Sign, MessageSecurityMode.SignAndEncrypt) self.is_encrypted = (mode == MessageSecurityMode.SignAndEncrypt)
[docs] def plain_block_size(self): """ Size of plain text block for block cipher. """ if self.is_encrypted: return self.Encryptor.plain_block_size() return 1
[docs] def encrypted_block_size(self): """ Size of encrypted text block for block cipher. """ if self.is_encrypted: return self.Encryptor.encrypted_block_size() return 1
[docs] def padding(self, size): """ Create padding for a block of given size. plain_size = size + len(padding) + signature_size() plain_size = N * plain_block_size() """ if not self.is_encrypted: return b'' block_size = self.Encryptor.plain_block_size() extrapad_size = 2 if self.Encryptor.encrypted_block_size() > 256 else 1 rem = (size + self.signature_size() + extrapad_size) % block_size if rem != 0: rem = block_size - rem data = bytes(bytearray([rem%256])) * (rem + 1) if self.Encryptor.encrypted_block_size() > 256: data = data + bytes(bytearray([rem>>8])) return data
[docs] def min_padding_size(self): if self.is_encrypted: return 1 return 0
[docs] def signature_size(self): return self.Signer.signature_size()
[docs] def signature(self, data): return self.Signer.signature(data)
[docs] def vsignature_size(self): return self.Verifier.signature_size()
[docs] def verify(self, data, sig): self.Verifier.verify(data, sig)
[docs] def encrypt(self, data): if self.is_encrypted: assert len(data) % self.Encryptor.plain_block_size() == 0 return self.Encryptor.encrypt(data) return data
[docs] def decrypt(self, data): if self.is_encrypted: return self.Decryptor.decrypt(data) return data
[docs] def remove_padding(self, data): if self.is_encrypted: if self.Decryptor.encrypted_block_size() > 256: pad_size = struct.unpack('<h', data[-2:])[0] + 2 else: pad_size = bytearray(data[-1:])[0] + 1 return data[:-pad_size] return data
[docs]class SignerRsa(Signer): def __init__(self, client_pk): require_cryptography(self) self.client_pk = client_pk self.key_size = self.client_pk.key_size // 8
[docs] def signature_size(self): return self.key_size
[docs] def signature(self, data): return uacrypto.sign_sha1(self.client_pk, data)
[docs]class VerifierRsa(Verifier): def __init__(self, server_cert): require_cryptography(self) self.server_cert = server_cert self.key_size = self.server_cert.public_key().key_size // 8
[docs] def signature_size(self): return self.key_size
[docs] def verify(self, data, signature): uacrypto.verify_sha1(self.server_cert, data, signature)
[docs]class EncryptorRsa(Encryptor): def __init__(self, server_cert, enc_fn, padding_size): require_cryptography(self) self.server_cert = server_cert self.key_size = self.server_cert.public_key().key_size // 8 self.encryptor = enc_fn self.padding_size = padding_size
[docs] def plain_block_size(self): return self.key_size - self.padding_size
[docs] def encrypted_block_size(self): return self.key_size
[docs] def encrypt(self, data): encrypted = b'' block_size = self.plain_block_size() for i in range(0, len(data), block_size): encrypted += self.encryptor(self.server_cert.public_key(), data[i: i + block_size]) return encrypted
[docs]class DecryptorRsa(Decryptor): def __init__(self, client_pk, dec_fn, padding_size): require_cryptography(self) self.client_pk = client_pk self.key_size = self.client_pk.key_size // 8 self.decryptor = dec_fn self.padding_size = padding_size
[docs] def plain_block_size(self): return self.key_size - self.padding_size
[docs] def encrypted_block_size(self): return self.key_size
[docs] def decrypt(self, data): decrypted = b'' block_size = self.encrypted_block_size() for i in range(0, len(data), block_size): decrypted += self.decryptor(self.client_pk, data[i: i + block_size]) return decrypted
[docs]class SignerAesCbc(Signer): def __init__(self, key): require_cryptography(self) self.key = key
[docs] def signature_size(self): return uacrypto.sha1_size()
[docs] def signature(self, data): return uacrypto.hmac_sha1(self.key, data)
[docs]class VerifierAesCbc(Verifier): def __init__(self, key): require_cryptography(self) self.key = key
[docs] def signature_size(self): return uacrypto.sha1_size()
[docs] def verify(self, data, signature): expected = uacrypto.hmac_sha1(self.key, data) if signature != expected: raise uacrypto.InvalidSignature
[docs]class EncryptorAesCbc(Encryptor): def __init__(self, key, init_vec): require_cryptography(self) self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
[docs] def plain_block_size(self): return self.cipher.algorithm.key_size // 8
[docs] def encrypted_block_size(self): return self.cipher.algorithm.key_size // 8
[docs] def encrypt(self, data): return uacrypto.cipher_encrypt(self.cipher, data)
[docs]class DecryptorAesCbc(Decryptor): def __init__(self, key, init_vec): require_cryptography(self) self.cipher = uacrypto.cipher_aes_cbc(key, init_vec)
[docs] def plain_block_size(self): return self.cipher.algorithm.key_size // 8
[docs] def encrypted_block_size(self): return self.cipher.algorithm.key_size // 8
[docs] def decrypt(self, data): return uacrypto.cipher_decrypt(self.cipher, data)
[docs]class SignerSha256(Signer): def __init__(self, client_pk): require_cryptography(self) self.client_pk = client_pk self.key_size = self.client_pk.key_size // 8
[docs] def signature_size(self): return self.key_size
[docs] def signature(self, data): return uacrypto.sign_sha256(self.client_pk, data)
[docs]class VerifierSha256(Verifier): def __init__(self, server_cert): require_cryptography(self) self.server_cert = server_cert self.key_size = self.server_cert.public_key().key_size // 8
[docs] def signature_size(self): return self.key_size
[docs] def verify(self, data, signature): uacrypto.verify_sha256(self.server_cert, data, signature)
[docs]class SignerHMac256(Signer): def __init__(self, key): require_cryptography(self) self.key = key
[docs] def signature_size(self): return uacrypto.sha256_size()
[docs] def signature(self, data): return uacrypto.hmac_sha256(self.key, data)
[docs]class VerifierHMac256(Verifier): def __init__(self, key): require_cryptography(self) self.key = key
[docs] def signature_size(self): return uacrypto.sha256_size()
[docs] def verify(self, data, signature): expected = uacrypto.hmac_sha256(self.key, data) if signature != expected: raise uacrypto.InvalidSignature
[docs]class SecurityPolicyBasic128Rsa15(SecurityPolicy): """ DEPRECATED, do not use anymore! Security Basic 128Rsa15 A suite of algorithms that uses RSA15 as Key-Wrap-algorithm and 128-Bit (16 bytes) for encryption algorithms. - SymmetricSignatureAlgorithm - HmacSha1 (http://www.w3.org/2000/09/xmldsig#hmac-sha1) - SymmetricEncryptionAlgorithm - Aes128 (http://www.w3.org/2001/04/xmlenc#aes128-cbc) - AsymmetricSignatureAlgorithm - RsaSha1 (http://www.w3.org/2000/09/xmldsig#rsa-sha1) - AsymmetricKeyWrapAlgorithm - KwRsa15 (http://www.w3.org/2001/04/xmlenc#rsa-1_5) - AsymmetricEncryptionAlgorithm - Rsa15 (http://www.w3.org/2001/04/xmlenc#rsa-1_5) - KeyDerivationAlgorithm - PSha1 (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1) - DerivedSignatureKeyLength - 128 (16 bytes) - MinAsymmetricKeyLength - 1024 (128 bytes) - MaxAsymmetricKeyLength - 2048 (256 bytes) - CertificateSignatureAlgorithm - Sha1 If a certificate or any certificate in the chain is not signed with a hash that is Sha1 or stronger then the certificate shall be rejected. """ URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic128Rsa15" signature_key_size = 16 symmetric_key_size = 16 AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-1_5" AsymmetricSignatureURI = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
[docs] @staticmethod def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa15(pubkey, data)
def __init__(self, server_cert, client_cert, client_pk, mode): logger = logging.getLogger(__name__) logger.warning("DEPRECATED! Do not use SecurityPolicyBasic128Rsa15 anymore!") require_cryptography(self) if isinstance(server_cert, bytes): server_cert = uacrypto.x509_from_der(server_cert) # even in Sign mode we need to asymmetrically encrypt secrets # transmitted in OpenSecureChannel. So SignAndEncrypt here self.asymmetric_cryptography = Cryptography( MessageSecurityMode.SignAndEncrypt) self.asymmetric_cryptography.Signer = SignerRsa(client_pk) self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert) self.asymmetric_cryptography.Encryptor = EncryptorRsa( server_cert, uacrypto.encrypt_rsa15, 11) self.asymmetric_cryptography.Decryptor = DecryptorRsa( client_pk, uacrypto.decrypt_rsa15, 11) self.symmetric_cryptography = Cryptography(mode) self.Mode = mode self.server_certificate = uacrypto.der_from_x509(server_cert) self.client_certificate = uacrypto.der_from_x509(client_cert)
[docs] def make_local_symmetric_key(self, secret, seed): key_sizes = (self.signature_key_size, self.symmetric_key_size, 16) (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes) self.symmetric_cryptography.Signer = SignerAesCbc(sigkey) self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
[docs] def make_remote_symmetric_key(self, secret, seed): key_sizes = (self.signature_key_size, self.symmetric_key_size, 16) (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes) self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey) self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
[docs]class SecurityPolicyBasic256(SecurityPolicy): """ DEPRECATED, do not use anymore! Security Basic 256 A suite of algorithms that are for 256-Bit (32 bytes) encryption, algorithms include: - SymmetricSignatureAlgorithm - HmacSha1 (http://www.w3.org/2000/09/xmldsig#hmac-sha1) - SymmetricEncryptionAlgorithm - Aes256 (http://www.w3.org/2001/04/xmlenc#aes256-cbc) - AsymmetricSignatureAlgorithm - RsaSha1 (http://www.w3.org/2000/09/xmldsig#rsa-sha1) - AsymmetricKeyWrapAlgorithm - KwRsaOaep (http://www.w3.org/2001/04/xmlenc#rsa-oaep-mgf1p) - AsymmetricEncryptionAlgorithm - RsaOaep (http://www.w3.org/2001/04/xmlenc#rsa-oaep) - KeyDerivationAlgorithm - PSha1 (http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha1) - DerivedSignatureKeyLength - 192 (24 bytes) - MinAsymmetricKeyLength - 1024 (128 bytes) - MaxAsymmetricKeyLength - 2048 (256 bytes) - CertificateSignatureAlgorithm - Sha1 If a certificate or any certificate in the chain is not signed with a hash that is Sha1 or stronger then the certificate shall be rejected. """ URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256" signature_key_size = 24 symmetric_key_size = 32 AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep" AsymmetricSignatureURI = "http://www.w3.org/2000/09/xmldsig#rsa-sha1"
[docs] @staticmethod def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa_oaep(pubkey, data)
def __init__(self, server_cert, client_cert, client_pk, mode): logger = logging.getLogger(__name__) logger.warning("DEPRECATED! Do not use SecurityPolicyBasic256 anymore!") require_cryptography(self) if isinstance(server_cert, bytes): server_cert = uacrypto.x509_from_der(server_cert) # even in Sign mode we need to asymmetrically encrypt secrets # transmitted in OpenSecureChannel. So SignAndEncrypt here self.asymmetric_cryptography = Cryptography( MessageSecurityMode.SignAndEncrypt) self.asymmetric_cryptography.Signer = SignerRsa(client_pk) self.asymmetric_cryptography.Verifier = VerifierRsa(server_cert) self.asymmetric_cryptography.Encryptor = EncryptorRsa( server_cert, uacrypto.encrypt_rsa_oaep, 42) self.asymmetric_cryptography.Decryptor = DecryptorRsa( client_pk, uacrypto.decrypt_rsa_oaep, 42) self.symmetric_cryptography = Cryptography(mode) self.Mode = mode self.server_certificate = uacrypto.der_from_x509(server_cert) self.client_certificate = uacrypto.der_from_x509(client_cert)
[docs] def make_local_symmetric_key(self, secret, seed): # specs part 6, 6.7.5 key_sizes = (self.signature_key_size, self.symmetric_key_size, 16) (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes) self.symmetric_cryptography.Signer = SignerAesCbc(sigkey) self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
[docs] def make_remote_symmetric_key(self, secret, seed): # specs part 6, 6.7.5 key_sizes = (self.signature_key_size, self.symmetric_key_size, 16) (sigkey, key, init_vec) = uacrypto.p_sha1(secret, seed, key_sizes) self.symmetric_cryptography.Verifier = VerifierAesCbc(sigkey) self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
[docs]class SecurityPolicyBasic256Sha256(SecurityPolicy): """ Security Basic 256Sha256 A suite of algorithms that uses Sha256 as Key-Wrap-algorithm and 256-Bit (32 bytes) for encryption algorithms. - SymmetricSignatureAlgorithm_HMAC-SHA2-256 https://tools.ietf.org/html/rfc4634 - SymmetricEncryptionAlgorithm_AES256-CBC http://www.w3.org/2001/04/xmlenc#aes256-cbc - AsymmetricSignatureAlgorithm_RSA-PKCS15-SHA2-256 http://www.w3.org/2001/04/xmldsig-more#rsa-sha256 - AsymmetricEncryptionAlgorithm_RSA-OAEP-SHA1 http://www.w3.org/2001/04/xmlenc#rsa-oaep - KeyDerivationAlgorithm_P-SHA2-256 http://docs.oasis-open.org/ws-sx/ws-secureconversation/200512/dk/p_sha256 - CertificateSignatureAlgorithm_RSA-PKCS15-SHA2-256 http://www.w3.org/2001/04/xmldsig-more#rsa-sha256 - Basic256Sha256_Limits -> DerivedSignatureKeyLength: 256 bits -> MinAsymmetricKeyLength: 2048 bits -> MaxAsymmetricKeyLength: 4096 bits -> SecureChannelNonceLength: 32 bytes """ URI = "http://opcfoundation.org/UA/SecurityPolicy#Basic256Sha256" signature_key_size = 32 symmetric_key_size = 32 AsymmetricEncryptionURI = "http://www.w3.org/2001/04/xmlenc#rsa-oaep" AsymmetricSignatureURI = "http://www.w3.org/2001/04/xmldsig-more#rsa-sha256"
[docs] @staticmethod def encrypt_asymmetric(pubkey, data): return uacrypto.encrypt_rsa_oaep(pubkey, data)
def __init__(self, server_cert, client_cert, client_pk, mode): require_cryptography(self) if isinstance(server_cert, bytes): server_cert = uacrypto.x509_from_der(server_cert) # even in Sign mode we need to asymmetrically encrypt secrets # transmitted in OpenSecureChannel. So SignAndEncrypt here self.asymmetric_cryptography = Cryptography( MessageSecurityMode.SignAndEncrypt) self.asymmetric_cryptography.Signer = SignerSha256(client_pk) self.asymmetric_cryptography.Verifier = VerifierSha256(server_cert) self.asymmetric_cryptography.Encryptor = EncryptorRsa( server_cert, uacrypto.encrypt_rsa_oaep, 42) self.asymmetric_cryptography.Decryptor = DecryptorRsa( client_pk, uacrypto.decrypt_rsa_oaep, 42) self.symmetric_cryptography = Cryptography(mode) self.Mode = mode self.server_certificate = uacrypto.der_from_x509(server_cert) self.client_certificate = uacrypto.der_from_x509(client_cert)
[docs] def make_local_symmetric_key(self, secret, seed): # specs part 6, 6.7.5 key_sizes = (self.signature_key_size, self.symmetric_key_size, 16) (sigkey, key, init_vec) = uacrypto.p_sha256(secret, seed, key_sizes) self.symmetric_cryptography.Signer = SignerHMac256(sigkey) self.symmetric_cryptography.Encryptor = EncryptorAesCbc(key, init_vec)
[docs] def make_remote_symmetric_key(self, secret, seed): # specs part 6, 6.7.5 key_sizes = (self.signature_key_size, self.symmetric_key_size, 16) (sigkey, key, init_vec) = uacrypto.p_sha256(secret, seed, key_sizes) self.symmetric_cryptography.Verifier = VerifierHMac256(sigkey) self.symmetric_cryptography.Decryptor = DecryptorAesCbc(key, init_vec)
[docs]def encrypt_asymmetric(pubkey, data, policy_uri): """ Encrypt data with pubkey using an asymmetric algorithm. The algorithm is selected by policy_uri. Returns a tuple (encrypted_data, algorithm_uri) """ for cls in [SecurityPolicyBasic256Sha256, SecurityPolicyBasic256, SecurityPolicyBasic128Rsa15]: if policy_uri == cls.URI: return (cls.encrypt_asymmetric(pubkey, data), cls.AsymmetricEncryptionURI) if not policy_uri or policy_uri == POLICY_NONE_URI: return (data, '') raise UaError("Unsupported security policy `{0}`".format(policy_uri))