mirror of
https://github.com/PiBrewing/craftbeerpi4.git
synced 2024-12-13 08:54:55 +01:00
2776 lines
104 KiB
Python
2776 lines
104 KiB
Python
# This file is dual licensed under the terms of the Apache License, Version
|
|
# 2.0, and the BSD License. See the LICENSE file in the root of this repository
|
|
# for complete details.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
|
|
import collections
|
|
import contextlib
|
|
import itertools
|
|
import warnings
|
|
from contextlib import contextmanager
|
|
|
|
import six
|
|
from six.moves import range
|
|
|
|
from cryptography import utils, x509
|
|
from cryptography.exceptions import UnsupportedAlgorithm, _Reasons
|
|
from cryptography.hazmat._der import (
|
|
INTEGER,
|
|
NULL,
|
|
SEQUENCE,
|
|
encode_der,
|
|
encode_der_integer,
|
|
)
|
|
from cryptography.hazmat.backends.interfaces import (
|
|
CMACBackend,
|
|
CipherBackend,
|
|
DERSerializationBackend,
|
|
DHBackend,
|
|
DSABackend,
|
|
EllipticCurveBackend,
|
|
HMACBackend,
|
|
HashBackend,
|
|
PBKDF2HMACBackend,
|
|
PEMSerializationBackend,
|
|
RSABackend,
|
|
ScryptBackend,
|
|
X509Backend,
|
|
)
|
|
from cryptography.hazmat.backends.openssl import aead
|
|
from cryptography.hazmat.backends.openssl.ciphers import _CipherContext
|
|
from cryptography.hazmat.backends.openssl.cmac import _CMACContext
|
|
from cryptography.hazmat.backends.openssl.decode_asn1 import (
|
|
_CRL_ENTRY_REASON_ENUM_TO_CODE,
|
|
_CRL_EXTENSION_HANDLERS,
|
|
_EXTENSION_HANDLERS_BASE,
|
|
_EXTENSION_HANDLERS_SCT,
|
|
_OCSP_BASICRESP_EXTENSION_HANDLERS,
|
|
_OCSP_REQ_EXTENSION_HANDLERS,
|
|
_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT,
|
|
_REVOKED_EXTENSION_HANDLERS,
|
|
_X509ExtensionParser,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.dh import (
|
|
_DHParameters,
|
|
_DHPrivateKey,
|
|
_DHPublicKey,
|
|
_dh_params_dup,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.dsa import (
|
|
_DSAParameters,
|
|
_DSAPrivateKey,
|
|
_DSAPublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.ec import (
|
|
_EllipticCurvePrivateKey,
|
|
_EllipticCurvePublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.ed25519 import (
|
|
_Ed25519PrivateKey,
|
|
_Ed25519PublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.ed448 import (
|
|
_ED448_KEY_SIZE,
|
|
_Ed448PrivateKey,
|
|
_Ed448PublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.encode_asn1 import (
|
|
_CRL_ENTRY_EXTENSION_ENCODE_HANDLERS,
|
|
_CRL_EXTENSION_ENCODE_HANDLERS,
|
|
_EXTENSION_ENCODE_HANDLERS,
|
|
_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS,
|
|
_OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS,
|
|
_encode_asn1_int_gc,
|
|
_encode_asn1_str_gc,
|
|
_encode_name_gc,
|
|
_txt2obj_gc,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.hashes import _HashContext
|
|
from cryptography.hazmat.backends.openssl.hmac import _HMACContext
|
|
from cryptography.hazmat.backends.openssl.ocsp import (
|
|
_OCSPRequest,
|
|
_OCSPResponse,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.poly1305 import (
|
|
_POLY1305_KEY_SIZE,
|
|
_Poly1305Context,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.rsa import (
|
|
_RSAPrivateKey,
|
|
_RSAPublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.x25519 import (
|
|
_X25519PrivateKey,
|
|
_X25519PublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.x448 import (
|
|
_X448PrivateKey,
|
|
_X448PublicKey,
|
|
)
|
|
from cryptography.hazmat.backends.openssl.x509 import (
|
|
_Certificate,
|
|
_CertificateRevocationList,
|
|
_CertificateSigningRequest,
|
|
_RevokedCertificate,
|
|
)
|
|
from cryptography.hazmat.bindings.openssl import binding
|
|
from cryptography.hazmat.primitives import hashes, serialization
|
|
from cryptography.hazmat.primitives.asymmetric import (
|
|
dh,
|
|
dsa,
|
|
ec,
|
|
ed25519,
|
|
ed448,
|
|
rsa,
|
|
)
|
|
from cryptography.hazmat.primitives.asymmetric.padding import (
|
|
MGF1,
|
|
OAEP,
|
|
PKCS1v15,
|
|
PSS,
|
|
)
|
|
from cryptography.hazmat.primitives.ciphers.algorithms import (
|
|
AES,
|
|
ARC4,
|
|
Blowfish,
|
|
CAST5,
|
|
Camellia,
|
|
ChaCha20,
|
|
IDEA,
|
|
SEED,
|
|
TripleDES,
|
|
)
|
|
from cryptography.hazmat.primitives.ciphers.modes import (
|
|
CBC,
|
|
CFB,
|
|
CFB8,
|
|
CTR,
|
|
ECB,
|
|
GCM,
|
|
OFB,
|
|
XTS,
|
|
)
|
|
from cryptography.hazmat.primitives.kdf import scrypt
|
|
from cryptography.hazmat.primitives.serialization import pkcs7, ssh
|
|
from cryptography.x509 import ocsp
|
|
|
|
|
|
_MemoryBIO = collections.namedtuple("_MemoryBIO", ["bio", "char_ptr"])
|
|
|
|
|
|
# Not actually supported, just used as a marker for some serialization tests.
|
|
class _RC2(object):
|
|
pass
|
|
|
|
|
|
@utils.register_interface(CipherBackend)
|
|
@utils.register_interface(CMACBackend)
|
|
@utils.register_interface(DERSerializationBackend)
|
|
@utils.register_interface(DHBackend)
|
|
@utils.register_interface(DSABackend)
|
|
@utils.register_interface(EllipticCurveBackend)
|
|
@utils.register_interface(HashBackend)
|
|
@utils.register_interface(HMACBackend)
|
|
@utils.register_interface(PBKDF2HMACBackend)
|
|
@utils.register_interface(RSABackend)
|
|
@utils.register_interface(PEMSerializationBackend)
|
|
@utils.register_interface(X509Backend)
|
|
@utils.register_interface_if(
|
|
binding.Binding().lib.Cryptography_HAS_SCRYPT, ScryptBackend
|
|
)
|
|
class Backend(object):
|
|
"""
|
|
OpenSSL API binding interfaces.
|
|
"""
|
|
|
|
name = "openssl"
|
|
|
|
# FIPS has opinions about acceptable algorithms and key sizes, but the
|
|
# disallowed algorithms are still present in OpenSSL. They just error if
|
|
# you try to use them. To avoid that we allowlist the algorithms in
|
|
# FIPS 140-3. This isn't ideal, but FIPS 140-3 is trash so here we are.
|
|
_fips_aead = {
|
|
b"aes-128-ccm",
|
|
b"aes-192-ccm",
|
|
b"aes-256-ccm",
|
|
b"aes-128-gcm",
|
|
b"aes-192-gcm",
|
|
b"aes-256-gcm",
|
|
}
|
|
_fips_ciphers = (AES, TripleDES)
|
|
_fips_hashes = (
|
|
hashes.SHA1,
|
|
hashes.SHA224,
|
|
hashes.SHA256,
|
|
hashes.SHA384,
|
|
hashes.SHA512,
|
|
hashes.SHA512_224,
|
|
hashes.SHA512_256,
|
|
hashes.SHA3_224,
|
|
hashes.SHA3_256,
|
|
hashes.SHA3_384,
|
|
hashes.SHA3_512,
|
|
hashes.SHAKE128,
|
|
hashes.SHAKE256,
|
|
)
|
|
_fips_rsa_min_key_size = 2048
|
|
_fips_rsa_min_public_exponent = 65537
|
|
_fips_dsa_min_modulus = 1 << 2048
|
|
_fips_dh_min_key_size = 2048
|
|
_fips_dh_min_modulus = 1 << _fips_dh_min_key_size
|
|
|
|
def __init__(self):
|
|
self._binding = binding.Binding()
|
|
self._ffi = self._binding.ffi
|
|
self._lib = self._binding.lib
|
|
self._fips_enabled = self._is_fips_enabled()
|
|
|
|
self._cipher_registry = {}
|
|
self._register_default_ciphers()
|
|
self._register_x509_ext_parsers()
|
|
self._register_x509_encoders()
|
|
if self._fips_enabled and self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
|
|
warnings.warn(
|
|
"OpenSSL FIPS mode is enabled. Can't enable DRBG fork safety.",
|
|
UserWarning,
|
|
)
|
|
else:
|
|
self.activate_osrandom_engine()
|
|
self._dh_types = [self._lib.EVP_PKEY_DH]
|
|
if self._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
|
self._dh_types.append(self._lib.EVP_PKEY_DHX)
|
|
|
|
def openssl_assert(self, ok, errors=None):
|
|
return binding._openssl_assert(self._lib, ok, errors=errors)
|
|
|
|
def _is_fips_enabled(self):
|
|
fips_mode = getattr(self._lib, "FIPS_mode", lambda: 0)
|
|
mode = fips_mode()
|
|
if mode == 0:
|
|
# OpenSSL without FIPS pushes an error on the error stack
|
|
self._lib.ERR_clear_error()
|
|
return bool(mode)
|
|
|
|
def activate_builtin_random(self):
|
|
if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
|
|
# Obtain a new structural reference.
|
|
e = self._lib.ENGINE_get_default_RAND()
|
|
if e != self._ffi.NULL:
|
|
self._lib.ENGINE_unregister_RAND(e)
|
|
# Reset the RNG to use the built-in.
|
|
res = self._lib.RAND_set_rand_method(self._ffi.NULL)
|
|
self.openssl_assert(res == 1)
|
|
# decrement the structural reference from get_default_RAND
|
|
res = self._lib.ENGINE_finish(e)
|
|
self.openssl_assert(res == 1)
|
|
|
|
@contextlib.contextmanager
|
|
def _get_osurandom_engine(self):
|
|
# Fetches an engine by id and returns it. This creates a structural
|
|
# reference.
|
|
e = self._lib.ENGINE_by_id(self._lib.Cryptography_osrandom_engine_id)
|
|
self.openssl_assert(e != self._ffi.NULL)
|
|
# Initialize the engine for use. This adds a functional reference.
|
|
res = self._lib.ENGINE_init(e)
|
|
self.openssl_assert(res == 1)
|
|
|
|
try:
|
|
yield e
|
|
finally:
|
|
# Decrement the structural ref incremented by ENGINE_by_id.
|
|
res = self._lib.ENGINE_free(e)
|
|
self.openssl_assert(res == 1)
|
|
# Decrement the functional ref incremented by ENGINE_init.
|
|
res = self._lib.ENGINE_finish(e)
|
|
self.openssl_assert(res == 1)
|
|
|
|
def activate_osrandom_engine(self):
|
|
if self._lib.CRYPTOGRAPHY_NEEDS_OSRANDOM_ENGINE:
|
|
# Unregister and free the current engine.
|
|
self.activate_builtin_random()
|
|
with self._get_osurandom_engine() as e:
|
|
# Set the engine as the default RAND provider.
|
|
res = self._lib.ENGINE_set_default_RAND(e)
|
|
self.openssl_assert(res == 1)
|
|
# Reset the RNG to use the engine
|
|
res = self._lib.RAND_set_rand_method(self._ffi.NULL)
|
|
self.openssl_assert(res == 1)
|
|
|
|
def osrandom_engine_implementation(self):
|
|
buf = self._ffi.new("char[]", 64)
|
|
with self._get_osurandom_engine() as e:
|
|
res = self._lib.ENGINE_ctrl_cmd(
|
|
e, b"get_implementation", len(buf), buf, self._ffi.NULL, 0
|
|
)
|
|
self.openssl_assert(res > 0)
|
|
return self._ffi.string(buf).decode("ascii")
|
|
|
|
def openssl_version_text(self):
|
|
"""
|
|
Friendly string name of the loaded OpenSSL library. This is not
|
|
necessarily the same version as it was compiled against.
|
|
|
|
Example: OpenSSL 1.1.1d 10 Sep 2019
|
|
"""
|
|
return self._ffi.string(
|
|
self._lib.OpenSSL_version(self._lib.OPENSSL_VERSION)
|
|
).decode("ascii")
|
|
|
|
def openssl_version_number(self):
|
|
return self._lib.OpenSSL_version_num()
|
|
|
|
def create_hmac_ctx(self, key, algorithm):
|
|
return _HMACContext(self, key, algorithm)
|
|
|
|
def _evp_md_from_algorithm(self, algorithm):
|
|
if algorithm.name == "blake2b" or algorithm.name == "blake2s":
|
|
alg = "{}{}".format(
|
|
algorithm.name, algorithm.digest_size * 8
|
|
).encode("ascii")
|
|
else:
|
|
alg = algorithm.name.encode("ascii")
|
|
|
|
evp_md = self._lib.EVP_get_digestbyname(alg)
|
|
return evp_md
|
|
|
|
def _evp_md_non_null_from_algorithm(self, algorithm):
|
|
evp_md = self._evp_md_from_algorithm(algorithm)
|
|
self.openssl_assert(evp_md != self._ffi.NULL)
|
|
return evp_md
|
|
|
|
def hash_supported(self, algorithm):
|
|
if self._fips_enabled and not isinstance(algorithm, self._fips_hashes):
|
|
return False
|
|
|
|
evp_md = self._evp_md_from_algorithm(algorithm)
|
|
return evp_md != self._ffi.NULL
|
|
|
|
def hmac_supported(self, algorithm):
|
|
return self.hash_supported(algorithm)
|
|
|
|
def create_hash_ctx(self, algorithm):
|
|
return _HashContext(self, algorithm)
|
|
|
|
def cipher_supported(self, cipher, mode):
|
|
if self._fips_enabled and not isinstance(cipher, self._fips_ciphers):
|
|
return False
|
|
try:
|
|
adapter = self._cipher_registry[type(cipher), type(mode)]
|
|
except KeyError:
|
|
return False
|
|
evp_cipher = adapter(self, cipher, mode)
|
|
return self._ffi.NULL != evp_cipher
|
|
|
|
def register_cipher_adapter(self, cipher_cls, mode_cls, adapter):
|
|
if (cipher_cls, mode_cls) in self._cipher_registry:
|
|
raise ValueError(
|
|
"Duplicate registration for: {} {}.".format(
|
|
cipher_cls, mode_cls
|
|
)
|
|
)
|
|
self._cipher_registry[cipher_cls, mode_cls] = adapter
|
|
|
|
def _register_default_ciphers(self):
|
|
for mode_cls in [CBC, CTR, ECB, OFB, CFB, CFB8, GCM]:
|
|
self.register_cipher_adapter(
|
|
AES,
|
|
mode_cls,
|
|
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
|
|
)
|
|
for mode_cls in [CBC, CTR, ECB, OFB, CFB]:
|
|
self.register_cipher_adapter(
|
|
Camellia,
|
|
mode_cls,
|
|
GetCipherByName("{cipher.name}-{cipher.key_size}-{mode.name}"),
|
|
)
|
|
for mode_cls in [CBC, CFB, CFB8, OFB]:
|
|
self.register_cipher_adapter(
|
|
TripleDES, mode_cls, GetCipherByName("des-ede3-{mode.name}")
|
|
)
|
|
self.register_cipher_adapter(
|
|
TripleDES, ECB, GetCipherByName("des-ede3")
|
|
)
|
|
for mode_cls in [CBC, CFB, OFB, ECB]:
|
|
self.register_cipher_adapter(
|
|
Blowfish, mode_cls, GetCipherByName("bf-{mode.name}")
|
|
)
|
|
for mode_cls in [CBC, CFB, OFB, ECB]:
|
|
self.register_cipher_adapter(
|
|
SEED, mode_cls, GetCipherByName("seed-{mode.name}")
|
|
)
|
|
for cipher_cls, mode_cls in itertools.product(
|
|
[CAST5, IDEA],
|
|
[CBC, OFB, CFB, ECB],
|
|
):
|
|
self.register_cipher_adapter(
|
|
cipher_cls,
|
|
mode_cls,
|
|
GetCipherByName("{cipher.name}-{mode.name}"),
|
|
)
|
|
self.register_cipher_adapter(ARC4, type(None), GetCipherByName("rc4"))
|
|
# We don't actually support RC2, this is just used by some tests.
|
|
self.register_cipher_adapter(_RC2, type(None), GetCipherByName("rc2"))
|
|
self.register_cipher_adapter(
|
|
ChaCha20, type(None), GetCipherByName("chacha20")
|
|
)
|
|
self.register_cipher_adapter(AES, XTS, _get_xts_cipher)
|
|
|
|
def _register_x509_ext_parsers(self):
|
|
ext_handlers = _EXTENSION_HANDLERS_BASE.copy()
|
|
# All revoked extensions are valid single response extensions, see:
|
|
# https://tools.ietf.org/html/rfc6960#section-4.4.5
|
|
singleresp_handlers = _REVOKED_EXTENSION_HANDLERS.copy()
|
|
|
|
if self._lib.Cryptography_HAS_SCT:
|
|
ext_handlers.update(_EXTENSION_HANDLERS_SCT)
|
|
singleresp_handlers.update(_OCSP_SINGLERESP_EXTENSION_HANDLERS_SCT)
|
|
|
|
self._certificate_extension_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.X509_get_ext_count,
|
|
get_ext=self._lib.X509_get_ext,
|
|
handlers=ext_handlers,
|
|
)
|
|
self._csr_extension_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.sk_X509_EXTENSION_num,
|
|
get_ext=self._lib.sk_X509_EXTENSION_value,
|
|
handlers=ext_handlers,
|
|
)
|
|
self._revoked_cert_extension_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.X509_REVOKED_get_ext_count,
|
|
get_ext=self._lib.X509_REVOKED_get_ext,
|
|
handlers=_REVOKED_EXTENSION_HANDLERS,
|
|
)
|
|
self._crl_extension_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.X509_CRL_get_ext_count,
|
|
get_ext=self._lib.X509_CRL_get_ext,
|
|
handlers=_CRL_EXTENSION_HANDLERS,
|
|
)
|
|
self._ocsp_req_ext_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.OCSP_REQUEST_get_ext_count,
|
|
get_ext=self._lib.OCSP_REQUEST_get_ext,
|
|
handlers=_OCSP_REQ_EXTENSION_HANDLERS,
|
|
)
|
|
self._ocsp_basicresp_ext_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.OCSP_BASICRESP_get_ext_count,
|
|
get_ext=self._lib.OCSP_BASICRESP_get_ext,
|
|
handlers=_OCSP_BASICRESP_EXTENSION_HANDLERS,
|
|
)
|
|
self._ocsp_singleresp_ext_parser = _X509ExtensionParser(
|
|
self,
|
|
ext_count=self._lib.OCSP_SINGLERESP_get_ext_count,
|
|
get_ext=self._lib.OCSP_SINGLERESP_get_ext,
|
|
handlers=singleresp_handlers,
|
|
)
|
|
|
|
def _register_x509_encoders(self):
|
|
self._extension_encode_handlers = _EXTENSION_ENCODE_HANDLERS.copy()
|
|
self._crl_extension_encode_handlers = (
|
|
_CRL_EXTENSION_ENCODE_HANDLERS.copy()
|
|
)
|
|
self._crl_entry_extension_encode_handlers = (
|
|
_CRL_ENTRY_EXTENSION_ENCODE_HANDLERS.copy()
|
|
)
|
|
self._ocsp_request_extension_encode_handlers = (
|
|
_OCSP_REQUEST_EXTENSION_ENCODE_HANDLERS.copy()
|
|
)
|
|
self._ocsp_basicresp_extension_encode_handlers = (
|
|
_OCSP_BASICRESP_EXTENSION_ENCODE_HANDLERS.copy()
|
|
)
|
|
|
|
def create_symmetric_encryption_ctx(self, cipher, mode):
|
|
return _CipherContext(self, cipher, mode, _CipherContext._ENCRYPT)
|
|
|
|
def create_symmetric_decryption_ctx(self, cipher, mode):
|
|
return _CipherContext(self, cipher, mode, _CipherContext._DECRYPT)
|
|
|
|
def pbkdf2_hmac_supported(self, algorithm):
|
|
return self.hmac_supported(algorithm)
|
|
|
|
def derive_pbkdf2_hmac(
|
|
self, algorithm, length, salt, iterations, key_material
|
|
):
|
|
buf = self._ffi.new("unsigned char[]", length)
|
|
evp_md = self._evp_md_non_null_from_algorithm(algorithm)
|
|
key_material_ptr = self._ffi.from_buffer(key_material)
|
|
res = self._lib.PKCS5_PBKDF2_HMAC(
|
|
key_material_ptr,
|
|
len(key_material),
|
|
salt,
|
|
len(salt),
|
|
iterations,
|
|
evp_md,
|
|
length,
|
|
buf,
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
return self._ffi.buffer(buf)[:]
|
|
|
|
def _consume_errors(self):
|
|
return binding._consume_errors(self._lib)
|
|
|
|
def _consume_errors_with_text(self):
|
|
return binding._consume_errors_with_text(self._lib)
|
|
|
|
def _bn_to_int(self, bn):
|
|
assert bn != self._ffi.NULL
|
|
|
|
if not six.PY2:
|
|
# Python 3 has constant time from_bytes, so use that.
|
|
bn_num_bytes = self._lib.BN_num_bytes(bn)
|
|
bin_ptr = self._ffi.new("unsigned char[]", bn_num_bytes)
|
|
bin_len = self._lib.BN_bn2bin(bn, bin_ptr)
|
|
# A zero length means the BN has value 0
|
|
self.openssl_assert(bin_len >= 0)
|
|
val = int.from_bytes(self._ffi.buffer(bin_ptr)[:bin_len], "big")
|
|
if self._lib.BN_is_negative(bn):
|
|
val = -val
|
|
return val
|
|
else:
|
|
# Under Python 2 the best we can do is hex()
|
|
hex_cdata = self._lib.BN_bn2hex(bn)
|
|
self.openssl_assert(hex_cdata != self._ffi.NULL)
|
|
hex_str = self._ffi.string(hex_cdata)
|
|
self._lib.OPENSSL_free(hex_cdata)
|
|
return int(hex_str, 16)
|
|
|
|
def _int_to_bn(self, num, bn=None):
|
|
"""
|
|
Converts a python integer to a BIGNUM. The returned BIGNUM will not
|
|
be garbage collected (to support adding them to structs that take
|
|
ownership of the object). Be sure to register it for GC if it will
|
|
be discarded after use.
|
|
"""
|
|
assert bn is None or bn != self._ffi.NULL
|
|
|
|
if bn is None:
|
|
bn = self._ffi.NULL
|
|
|
|
if not six.PY2:
|
|
# Python 3 has constant time to_bytes, so use that.
|
|
|
|
binary = num.to_bytes(int(num.bit_length() / 8.0 + 1), "big")
|
|
bn_ptr = self._lib.BN_bin2bn(binary, len(binary), bn)
|
|
self.openssl_assert(bn_ptr != self._ffi.NULL)
|
|
return bn_ptr
|
|
|
|
else:
|
|
# Under Python 2 the best we can do is hex(), [2:] removes the 0x
|
|
# prefix.
|
|
hex_num = hex(num).rstrip("L")[2:].encode("ascii")
|
|
bn_ptr = self._ffi.new("BIGNUM **")
|
|
bn_ptr[0] = bn
|
|
res = self._lib.BN_hex2bn(bn_ptr, hex_num)
|
|
self.openssl_assert(res != 0)
|
|
self.openssl_assert(bn_ptr[0] != self._ffi.NULL)
|
|
return bn_ptr[0]
|
|
|
|
def generate_rsa_private_key(self, public_exponent, key_size):
|
|
rsa._verify_rsa_parameters(public_exponent, key_size)
|
|
|
|
rsa_cdata = self._lib.RSA_new()
|
|
self.openssl_assert(rsa_cdata != self._ffi.NULL)
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
|
|
bn = self._int_to_bn(public_exponent)
|
|
bn = self._ffi.gc(bn, self._lib.BN_free)
|
|
|
|
res = self._lib.RSA_generate_key_ex(
|
|
rsa_cdata, key_size, bn, self._ffi.NULL
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
|
|
|
|
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
|
|
|
|
def generate_rsa_parameters_supported(self, public_exponent, key_size):
|
|
return (
|
|
public_exponent >= 3
|
|
and public_exponent & 1 != 0
|
|
and key_size >= 512
|
|
)
|
|
|
|
def load_rsa_private_numbers(self, numbers):
|
|
rsa._check_private_key_components(
|
|
numbers.p,
|
|
numbers.q,
|
|
numbers.d,
|
|
numbers.dmp1,
|
|
numbers.dmq1,
|
|
numbers.iqmp,
|
|
numbers.public_numbers.e,
|
|
numbers.public_numbers.n,
|
|
)
|
|
rsa_cdata = self._lib.RSA_new()
|
|
self.openssl_assert(rsa_cdata != self._ffi.NULL)
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
p = self._int_to_bn(numbers.p)
|
|
q = self._int_to_bn(numbers.q)
|
|
d = self._int_to_bn(numbers.d)
|
|
dmp1 = self._int_to_bn(numbers.dmp1)
|
|
dmq1 = self._int_to_bn(numbers.dmq1)
|
|
iqmp = self._int_to_bn(numbers.iqmp)
|
|
e = self._int_to_bn(numbers.public_numbers.e)
|
|
n = self._int_to_bn(numbers.public_numbers.n)
|
|
res = self._lib.RSA_set0_factors(rsa_cdata, p, q)
|
|
self.openssl_assert(res == 1)
|
|
res = self._lib.RSA_set0_key(rsa_cdata, n, e, d)
|
|
self.openssl_assert(res == 1)
|
|
res = self._lib.RSA_set0_crt_params(rsa_cdata, dmp1, dmq1, iqmp)
|
|
self.openssl_assert(res == 1)
|
|
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
|
|
|
|
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
|
|
|
|
def load_rsa_public_numbers(self, numbers):
|
|
rsa._check_public_key_components(numbers.e, numbers.n)
|
|
rsa_cdata = self._lib.RSA_new()
|
|
self.openssl_assert(rsa_cdata != self._ffi.NULL)
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
e = self._int_to_bn(numbers.e)
|
|
n = self._int_to_bn(numbers.n)
|
|
res = self._lib.RSA_set0_key(rsa_cdata, n, e, self._ffi.NULL)
|
|
self.openssl_assert(res == 1)
|
|
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
|
|
|
|
return _RSAPublicKey(self, rsa_cdata, evp_pkey)
|
|
|
|
def _create_evp_pkey_gc(self):
|
|
evp_pkey = self._lib.EVP_PKEY_new()
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
return evp_pkey
|
|
|
|
def _rsa_cdata_to_evp_pkey(self, rsa_cdata):
|
|
evp_pkey = self._create_evp_pkey_gc()
|
|
res = self._lib.EVP_PKEY_set1_RSA(evp_pkey, rsa_cdata)
|
|
self.openssl_assert(res == 1)
|
|
return evp_pkey
|
|
|
|
def _bytes_to_bio(self, data):
|
|
"""
|
|
Return a _MemoryBIO namedtuple of (BIO, char*).
|
|
|
|
The char* is the storage for the BIO and it must stay alive until the
|
|
BIO is finished with.
|
|
"""
|
|
data_ptr = self._ffi.from_buffer(data)
|
|
bio = self._lib.BIO_new_mem_buf(data_ptr, len(data))
|
|
self.openssl_assert(bio != self._ffi.NULL)
|
|
|
|
return _MemoryBIO(self._ffi.gc(bio, self._lib.BIO_free), data_ptr)
|
|
|
|
def _create_mem_bio_gc(self):
|
|
"""
|
|
Creates an empty memory BIO.
|
|
"""
|
|
bio_method = self._lib.BIO_s_mem()
|
|
self.openssl_assert(bio_method != self._ffi.NULL)
|
|
bio = self._lib.BIO_new(bio_method)
|
|
self.openssl_assert(bio != self._ffi.NULL)
|
|
bio = self._ffi.gc(bio, self._lib.BIO_free)
|
|
return bio
|
|
|
|
def _read_mem_bio(self, bio):
|
|
"""
|
|
Reads a memory BIO. This only works on memory BIOs.
|
|
"""
|
|
buf = self._ffi.new("char **")
|
|
buf_len = self._lib.BIO_get_mem_data(bio, buf)
|
|
self.openssl_assert(buf_len > 0)
|
|
self.openssl_assert(buf[0] != self._ffi.NULL)
|
|
bio_data = self._ffi.buffer(buf[0], buf_len)[:]
|
|
return bio_data
|
|
|
|
def _evp_pkey_to_private_key(self, evp_pkey):
|
|
"""
|
|
Return the appropriate type of PrivateKey given an evp_pkey cdata
|
|
pointer.
|
|
"""
|
|
|
|
key_type = self._lib.EVP_PKEY_id(evp_pkey)
|
|
|
|
if key_type == self._lib.EVP_PKEY_RSA:
|
|
rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
|
|
self.openssl_assert(rsa_cdata != self._ffi.NULL)
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
return _RSAPrivateKey(self, rsa_cdata, evp_pkey)
|
|
elif key_type == self._lib.EVP_PKEY_DSA:
|
|
dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
|
|
self.openssl_assert(dsa_cdata != self._ffi.NULL)
|
|
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
|
|
return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
|
|
elif key_type == self._lib.EVP_PKEY_EC:
|
|
ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
|
|
self.openssl_assert(ec_cdata != self._ffi.NULL)
|
|
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
|
|
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
|
|
elif key_type in self._dh_types:
|
|
dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
|
|
self.openssl_assert(dh_cdata != self._ffi.NULL)
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
return _DHPrivateKey(self, dh_cdata, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
|
|
# EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
|
|
return _Ed25519PrivateKey(self, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
|
|
# EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
|
|
return _X448PrivateKey(self, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
|
|
# EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
|
|
return _X25519PrivateKey(self, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
|
|
# EVP_PKEY_ED448 is not present in OpenSSL < 1.1.1
|
|
return _Ed448PrivateKey(self, evp_pkey)
|
|
else:
|
|
raise UnsupportedAlgorithm("Unsupported key type.")
|
|
|
|
def _evp_pkey_to_public_key(self, evp_pkey):
|
|
"""
|
|
Return the appropriate type of PublicKey given an evp_pkey cdata
|
|
pointer.
|
|
"""
|
|
|
|
key_type = self._lib.EVP_PKEY_id(evp_pkey)
|
|
|
|
if key_type == self._lib.EVP_PKEY_RSA:
|
|
rsa_cdata = self._lib.EVP_PKEY_get1_RSA(evp_pkey)
|
|
self.openssl_assert(rsa_cdata != self._ffi.NULL)
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
return _RSAPublicKey(self, rsa_cdata, evp_pkey)
|
|
elif key_type == self._lib.EVP_PKEY_DSA:
|
|
dsa_cdata = self._lib.EVP_PKEY_get1_DSA(evp_pkey)
|
|
self.openssl_assert(dsa_cdata != self._ffi.NULL)
|
|
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
|
|
return _DSAPublicKey(self, dsa_cdata, evp_pkey)
|
|
elif key_type == self._lib.EVP_PKEY_EC:
|
|
ec_cdata = self._lib.EVP_PKEY_get1_EC_KEY(evp_pkey)
|
|
self.openssl_assert(ec_cdata != self._ffi.NULL)
|
|
ec_cdata = self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
|
|
return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
|
|
elif key_type in self._dh_types:
|
|
dh_cdata = self._lib.EVP_PKEY_get1_DH(evp_pkey)
|
|
self.openssl_assert(dh_cdata != self._ffi.NULL)
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
return _DHPublicKey(self, dh_cdata, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_ED25519", None):
|
|
# EVP_PKEY_ED25519 is not present in OpenSSL < 1.1.1
|
|
return _Ed25519PublicKey(self, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_X448", None):
|
|
# EVP_PKEY_X448 is not present in OpenSSL < 1.1.1
|
|
return _X448PublicKey(self, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_X25519", None):
|
|
# EVP_PKEY_X25519 is not present in OpenSSL < 1.1.0
|
|
return _X25519PublicKey(self, evp_pkey)
|
|
elif key_type == getattr(self._lib, "EVP_PKEY_ED448", None):
|
|
# EVP_PKEY_X25519 is not present in OpenSSL < 1.1.1
|
|
return _Ed448PublicKey(self, evp_pkey)
|
|
else:
|
|
raise UnsupportedAlgorithm("Unsupported key type.")
|
|
|
|
def _oaep_hash_supported(self, algorithm):
|
|
if self._lib.Cryptography_HAS_RSA_OAEP_MD:
|
|
return isinstance(
|
|
algorithm,
|
|
(
|
|
hashes.SHA1,
|
|
hashes.SHA224,
|
|
hashes.SHA256,
|
|
hashes.SHA384,
|
|
hashes.SHA512,
|
|
),
|
|
)
|
|
else:
|
|
return isinstance(algorithm, hashes.SHA1)
|
|
|
|
def rsa_padding_supported(self, padding):
|
|
if isinstance(padding, PKCS1v15):
|
|
return True
|
|
elif isinstance(padding, PSS) and isinstance(padding._mgf, MGF1):
|
|
return self.hash_supported(padding._mgf._algorithm)
|
|
elif isinstance(padding, OAEP) and isinstance(padding._mgf, MGF1):
|
|
return (
|
|
self._oaep_hash_supported(padding._mgf._algorithm)
|
|
and self._oaep_hash_supported(padding._algorithm)
|
|
and (
|
|
(padding._label is None or len(padding._label) == 0)
|
|
or self._lib.Cryptography_HAS_RSA_OAEP_LABEL == 1
|
|
)
|
|
)
|
|
else:
|
|
return False
|
|
|
|
def generate_dsa_parameters(self, key_size):
|
|
if key_size not in (1024, 2048, 3072, 4096):
|
|
raise ValueError(
|
|
"Key size must be 1024, 2048, 3072, or 4096 bits."
|
|
)
|
|
|
|
ctx = self._lib.DSA_new()
|
|
self.openssl_assert(ctx != self._ffi.NULL)
|
|
ctx = self._ffi.gc(ctx, self._lib.DSA_free)
|
|
|
|
res = self._lib.DSA_generate_parameters_ex(
|
|
ctx,
|
|
key_size,
|
|
self._ffi.NULL,
|
|
0,
|
|
self._ffi.NULL,
|
|
self._ffi.NULL,
|
|
self._ffi.NULL,
|
|
)
|
|
|
|
self.openssl_assert(res == 1)
|
|
|
|
return _DSAParameters(self, ctx)
|
|
|
|
def generate_dsa_private_key(self, parameters):
|
|
ctx = self._lib.DSAparams_dup(parameters._dsa_cdata)
|
|
self.openssl_assert(ctx != self._ffi.NULL)
|
|
ctx = self._ffi.gc(ctx, self._lib.DSA_free)
|
|
self._lib.DSA_generate_key(ctx)
|
|
evp_pkey = self._dsa_cdata_to_evp_pkey(ctx)
|
|
|
|
return _DSAPrivateKey(self, ctx, evp_pkey)
|
|
|
|
def generate_dsa_private_key_and_parameters(self, key_size):
|
|
parameters = self.generate_dsa_parameters(key_size)
|
|
return self.generate_dsa_private_key(parameters)
|
|
|
|
def _dsa_cdata_set_values(self, dsa_cdata, p, q, g, pub_key, priv_key):
|
|
res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
|
|
self.openssl_assert(res == 1)
|
|
res = self._lib.DSA_set0_key(dsa_cdata, pub_key, priv_key)
|
|
self.openssl_assert(res == 1)
|
|
|
|
def load_dsa_private_numbers(self, numbers):
|
|
dsa._check_dsa_private_numbers(numbers)
|
|
parameter_numbers = numbers.public_numbers.parameter_numbers
|
|
|
|
dsa_cdata = self._lib.DSA_new()
|
|
self.openssl_assert(dsa_cdata != self._ffi.NULL)
|
|
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
|
|
|
|
p = self._int_to_bn(parameter_numbers.p)
|
|
q = self._int_to_bn(parameter_numbers.q)
|
|
g = self._int_to_bn(parameter_numbers.g)
|
|
pub_key = self._int_to_bn(numbers.public_numbers.y)
|
|
priv_key = self._int_to_bn(numbers.x)
|
|
self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
|
|
|
|
evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
|
|
|
|
return _DSAPrivateKey(self, dsa_cdata, evp_pkey)
|
|
|
|
def load_dsa_public_numbers(self, numbers):
|
|
dsa._check_dsa_parameters(numbers.parameter_numbers)
|
|
dsa_cdata = self._lib.DSA_new()
|
|
self.openssl_assert(dsa_cdata != self._ffi.NULL)
|
|
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
|
|
|
|
p = self._int_to_bn(numbers.parameter_numbers.p)
|
|
q = self._int_to_bn(numbers.parameter_numbers.q)
|
|
g = self._int_to_bn(numbers.parameter_numbers.g)
|
|
pub_key = self._int_to_bn(numbers.y)
|
|
priv_key = self._ffi.NULL
|
|
self._dsa_cdata_set_values(dsa_cdata, p, q, g, pub_key, priv_key)
|
|
|
|
evp_pkey = self._dsa_cdata_to_evp_pkey(dsa_cdata)
|
|
|
|
return _DSAPublicKey(self, dsa_cdata, evp_pkey)
|
|
|
|
def load_dsa_parameter_numbers(self, numbers):
|
|
dsa._check_dsa_parameters(numbers)
|
|
dsa_cdata = self._lib.DSA_new()
|
|
self.openssl_assert(dsa_cdata != self._ffi.NULL)
|
|
dsa_cdata = self._ffi.gc(dsa_cdata, self._lib.DSA_free)
|
|
|
|
p = self._int_to_bn(numbers.p)
|
|
q = self._int_to_bn(numbers.q)
|
|
g = self._int_to_bn(numbers.g)
|
|
res = self._lib.DSA_set0_pqg(dsa_cdata, p, q, g)
|
|
self.openssl_assert(res == 1)
|
|
|
|
return _DSAParameters(self, dsa_cdata)
|
|
|
|
def _dsa_cdata_to_evp_pkey(self, dsa_cdata):
|
|
evp_pkey = self._create_evp_pkey_gc()
|
|
res = self._lib.EVP_PKEY_set1_DSA(evp_pkey, dsa_cdata)
|
|
self.openssl_assert(res == 1)
|
|
return evp_pkey
|
|
|
|
def dsa_hash_supported(self, algorithm):
|
|
return self.hash_supported(algorithm)
|
|
|
|
def dsa_parameters_supported(self, p, q, g):
|
|
return True
|
|
|
|
def cmac_algorithm_supported(self, algorithm):
|
|
return self.cipher_supported(
|
|
algorithm, CBC(b"\x00" * algorithm.block_size)
|
|
)
|
|
|
|
def create_cmac_ctx(self, algorithm):
|
|
return _CMACContext(self, algorithm)
|
|
|
|
def _x509_check_signature_params(self, private_key, algorithm):
|
|
if isinstance(
|
|
private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)
|
|
):
|
|
if algorithm is not None:
|
|
raise ValueError(
|
|
"algorithm must be None when signing via ed25519 or ed448"
|
|
)
|
|
elif not isinstance(
|
|
private_key,
|
|
(rsa.RSAPrivateKey, dsa.DSAPrivateKey, ec.EllipticCurvePrivateKey),
|
|
):
|
|
raise TypeError(
|
|
"Key must be an rsa, dsa, ec, ed25519, or ed448 private key."
|
|
)
|
|
elif not isinstance(algorithm, hashes.HashAlgorithm):
|
|
raise TypeError("Algorithm must be a registered hash algorithm.")
|
|
elif isinstance(algorithm, hashes.MD5) and not isinstance(
|
|
private_key, rsa.RSAPrivateKey
|
|
):
|
|
raise ValueError(
|
|
"MD5 hash algorithm is only supported with RSA keys"
|
|
)
|
|
|
|
def create_x509_csr(self, builder, private_key, algorithm):
|
|
if not isinstance(builder, x509.CertificateSigningRequestBuilder):
|
|
raise TypeError("Builder type mismatch.")
|
|
self._x509_check_signature_params(private_key, algorithm)
|
|
|
|
# Resolve the signature algorithm.
|
|
evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
|
|
|
|
# Create an empty request.
|
|
x509_req = self._lib.X509_REQ_new()
|
|
self.openssl_assert(x509_req != self._ffi.NULL)
|
|
x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
|
|
|
|
# Set x509 version.
|
|
res = self._lib.X509_REQ_set_version(x509_req, x509.Version.v1.value)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set subject name.
|
|
res = self._lib.X509_REQ_set_subject_name(
|
|
x509_req, _encode_name_gc(self, builder._subject_name)
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set subject public key.
|
|
public_key = private_key.public_key()
|
|
res = self._lib.X509_REQ_set_pubkey(x509_req, public_key._evp_pkey)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Add extensions.
|
|
sk_extension = self._lib.sk_X509_EXTENSION_new_null()
|
|
self.openssl_assert(sk_extension != self._ffi.NULL)
|
|
sk_extension = self._ffi.gc(
|
|
sk_extension,
|
|
lambda x: self._lib.sk_X509_EXTENSION_pop_free(
|
|
x,
|
|
self._ffi.addressof(
|
|
self._lib._original_lib, "X509_EXTENSION_free"
|
|
),
|
|
),
|
|
)
|
|
# Don't GC individual extensions because the memory is owned by
|
|
# sk_extensions and will be freed along with it.
|
|
self._create_x509_extensions(
|
|
extensions=builder._extensions,
|
|
handlers=self._extension_encode_handlers,
|
|
x509_obj=sk_extension,
|
|
add_func=self._lib.sk_X509_EXTENSION_insert,
|
|
gc=False,
|
|
)
|
|
res = self._lib.X509_REQ_add_extensions(x509_req, sk_extension)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Add attributes (all bytes encoded as ASN1 UTF8_STRING)
|
|
for attr_oid, attr_val in builder._attributes:
|
|
obj = _txt2obj_gc(self, attr_oid.dotted_string)
|
|
res = self._lib.X509_REQ_add1_attr_by_OBJ(
|
|
x509_req,
|
|
obj,
|
|
x509.name._ASN1Type.UTF8String.value,
|
|
attr_val,
|
|
len(attr_val),
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Sign the request using the requester's private key.
|
|
res = self._lib.X509_REQ_sign(x509_req, private_key._evp_pkey, evp_md)
|
|
if res == 0:
|
|
errors = self._consume_errors_with_text()
|
|
raise ValueError("Signing failed", errors)
|
|
|
|
return _CertificateSigningRequest(self, x509_req)
|
|
|
|
def create_x509_certificate(self, builder, private_key, algorithm):
|
|
if not isinstance(builder, x509.CertificateBuilder):
|
|
raise TypeError("Builder type mismatch.")
|
|
self._x509_check_signature_params(private_key, algorithm)
|
|
|
|
# Resolve the signature algorithm.
|
|
evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
|
|
|
|
# Create an empty certificate.
|
|
x509_cert = self._lib.X509_new()
|
|
x509_cert = self._ffi.gc(x509_cert, self._lib.X509_free)
|
|
|
|
# Set the x509 version.
|
|
res = self._lib.X509_set_version(x509_cert, builder._version.value)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the subject's name.
|
|
res = self._lib.X509_set_subject_name(
|
|
x509_cert, _encode_name_gc(self, builder._subject_name)
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the subject's public key.
|
|
res = self._lib.X509_set_pubkey(
|
|
x509_cert, builder._public_key._evp_pkey
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the certificate serial number.
|
|
serial_number = _encode_asn1_int_gc(self, builder._serial_number)
|
|
res = self._lib.X509_set_serialNumber(x509_cert, serial_number)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the "not before" time.
|
|
self._set_asn1_time(
|
|
self._lib.X509_getm_notBefore(x509_cert), builder._not_valid_before
|
|
)
|
|
|
|
# Set the "not after" time.
|
|
self._set_asn1_time(
|
|
self._lib.X509_getm_notAfter(x509_cert), builder._not_valid_after
|
|
)
|
|
|
|
# Add extensions.
|
|
self._create_x509_extensions(
|
|
extensions=builder._extensions,
|
|
handlers=self._extension_encode_handlers,
|
|
x509_obj=x509_cert,
|
|
add_func=self._lib.X509_add_ext,
|
|
gc=True,
|
|
)
|
|
|
|
# Set the issuer name.
|
|
res = self._lib.X509_set_issuer_name(
|
|
x509_cert, _encode_name_gc(self, builder._issuer_name)
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Sign the certificate with the issuer's private key.
|
|
res = self._lib.X509_sign(x509_cert, private_key._evp_pkey, evp_md)
|
|
if res == 0:
|
|
errors = self._consume_errors_with_text()
|
|
raise ValueError("Signing failed", errors)
|
|
|
|
return _Certificate(self, x509_cert)
|
|
|
|
def _evp_md_x509_null_if_eddsa(self, private_key, algorithm):
|
|
if isinstance(
|
|
private_key, (ed25519.Ed25519PrivateKey, ed448.Ed448PrivateKey)
|
|
):
|
|
# OpenSSL requires us to pass NULL for EVP_MD for ed25519/ed448
|
|
return self._ffi.NULL
|
|
else:
|
|
return self._evp_md_non_null_from_algorithm(algorithm)
|
|
|
|
def _set_asn1_time(self, asn1_time, time):
|
|
if time.year >= 2050:
|
|
asn1_str = time.strftime("%Y%m%d%H%M%SZ").encode("ascii")
|
|
else:
|
|
asn1_str = time.strftime("%y%m%d%H%M%SZ").encode("ascii")
|
|
res = self._lib.ASN1_TIME_set_string(asn1_time, asn1_str)
|
|
self.openssl_assert(res == 1)
|
|
|
|
def _create_asn1_time(self, time):
|
|
asn1_time = self._lib.ASN1_TIME_new()
|
|
self.openssl_assert(asn1_time != self._ffi.NULL)
|
|
asn1_time = self._ffi.gc(asn1_time, self._lib.ASN1_TIME_free)
|
|
self._set_asn1_time(asn1_time, time)
|
|
return asn1_time
|
|
|
|
def create_x509_crl(self, builder, private_key, algorithm):
|
|
if not isinstance(builder, x509.CertificateRevocationListBuilder):
|
|
raise TypeError("Builder type mismatch.")
|
|
self._x509_check_signature_params(private_key, algorithm)
|
|
|
|
evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
|
|
|
|
# Create an empty CRL.
|
|
x509_crl = self._lib.X509_CRL_new()
|
|
x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
|
|
|
|
# Set the x509 CRL version. We only support v2 (integer value 1).
|
|
res = self._lib.X509_CRL_set_version(x509_crl, 1)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the issuer name.
|
|
res = self._lib.X509_CRL_set_issuer_name(
|
|
x509_crl, _encode_name_gc(self, builder._issuer_name)
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the last update time.
|
|
last_update = self._create_asn1_time(builder._last_update)
|
|
res = self._lib.X509_CRL_set_lastUpdate(x509_crl, last_update)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Set the next update time.
|
|
next_update = self._create_asn1_time(builder._next_update)
|
|
res = self._lib.X509_CRL_set_nextUpdate(x509_crl, next_update)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# Add extensions.
|
|
self._create_x509_extensions(
|
|
extensions=builder._extensions,
|
|
handlers=self._crl_extension_encode_handlers,
|
|
x509_obj=x509_crl,
|
|
add_func=self._lib.X509_CRL_add_ext,
|
|
gc=True,
|
|
)
|
|
|
|
# add revoked certificates
|
|
for revoked_cert in builder._revoked_certificates:
|
|
# Duplicating because the X509_CRL takes ownership and will free
|
|
# this memory when X509_CRL_free is called.
|
|
revoked = self._lib.X509_REVOKED_dup(revoked_cert._x509_revoked)
|
|
self.openssl_assert(revoked != self._ffi.NULL)
|
|
res = self._lib.X509_CRL_add0_revoked(x509_crl, revoked)
|
|
self.openssl_assert(res == 1)
|
|
|
|
res = self._lib.X509_CRL_sign(x509_crl, private_key._evp_pkey, evp_md)
|
|
if res == 0:
|
|
errors = self._consume_errors_with_text()
|
|
raise ValueError("Signing failed", errors)
|
|
|
|
return _CertificateRevocationList(self, x509_crl)
|
|
|
|
def _create_x509_extensions(
|
|
self, extensions, handlers, x509_obj, add_func, gc
|
|
):
|
|
for i, extension in enumerate(extensions):
|
|
x509_extension = self._create_x509_extension(handlers, extension)
|
|
self.openssl_assert(x509_extension != self._ffi.NULL)
|
|
|
|
if gc:
|
|
x509_extension = self._ffi.gc(
|
|
x509_extension, self._lib.X509_EXTENSION_free
|
|
)
|
|
res = add_func(x509_obj, x509_extension, i)
|
|
self.openssl_assert(res >= 1)
|
|
|
|
def _create_raw_x509_extension(self, extension, value):
|
|
obj = _txt2obj_gc(self, extension.oid.dotted_string)
|
|
return self._lib.X509_EXTENSION_create_by_OBJ(
|
|
self._ffi.NULL, obj, 1 if extension.critical else 0, value
|
|
)
|
|
|
|
def _create_x509_extension(self, handlers, extension):
|
|
if isinstance(extension.value, x509.UnrecognizedExtension):
|
|
value = _encode_asn1_str_gc(self, extension.value.value)
|
|
return self._create_raw_x509_extension(extension, value)
|
|
elif isinstance(extension.value, x509.TLSFeature):
|
|
asn1 = encode_der(
|
|
SEQUENCE,
|
|
*[
|
|
encode_der(INTEGER, encode_der_integer(x.value))
|
|
for x in extension.value
|
|
]
|
|
)
|
|
value = _encode_asn1_str_gc(self, asn1)
|
|
return self._create_raw_x509_extension(extension, value)
|
|
elif isinstance(extension.value, x509.PrecertPoison):
|
|
value = _encode_asn1_str_gc(self, encode_der(NULL))
|
|
return self._create_raw_x509_extension(extension, value)
|
|
else:
|
|
try:
|
|
encode = handlers[extension.oid]
|
|
except KeyError:
|
|
raise NotImplementedError(
|
|
"Extension not supported: {}".format(extension.oid)
|
|
)
|
|
|
|
ext_struct = encode(self, extension.value)
|
|
nid = self._lib.OBJ_txt2nid(
|
|
extension.oid.dotted_string.encode("ascii")
|
|
)
|
|
self.openssl_assert(nid != self._lib.NID_undef)
|
|
return self._lib.X509V3_EXT_i2d(
|
|
nid, 1 if extension.critical else 0, ext_struct
|
|
)
|
|
|
|
def create_x509_revoked_certificate(self, builder):
|
|
if not isinstance(builder, x509.RevokedCertificateBuilder):
|
|
raise TypeError("Builder type mismatch.")
|
|
|
|
x509_revoked = self._lib.X509_REVOKED_new()
|
|
self.openssl_assert(x509_revoked != self._ffi.NULL)
|
|
x509_revoked = self._ffi.gc(x509_revoked, self._lib.X509_REVOKED_free)
|
|
serial_number = _encode_asn1_int_gc(self, builder._serial_number)
|
|
res = self._lib.X509_REVOKED_set_serialNumber(
|
|
x509_revoked, serial_number
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
rev_date = self._create_asn1_time(builder._revocation_date)
|
|
res = self._lib.X509_REVOKED_set_revocationDate(x509_revoked, rev_date)
|
|
self.openssl_assert(res == 1)
|
|
# add CRL entry extensions
|
|
self._create_x509_extensions(
|
|
extensions=builder._extensions,
|
|
handlers=self._crl_entry_extension_encode_handlers,
|
|
x509_obj=x509_revoked,
|
|
add_func=self._lib.X509_REVOKED_add_ext,
|
|
gc=True,
|
|
)
|
|
return _RevokedCertificate(self, None, x509_revoked)
|
|
|
|
def load_pem_private_key(self, data, password):
|
|
return self._load_key(
|
|
self._lib.PEM_read_bio_PrivateKey,
|
|
self._evp_pkey_to_private_key,
|
|
data,
|
|
password,
|
|
)
|
|
|
|
def load_pem_public_key(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
evp_pkey = self._lib.PEM_read_bio_PUBKEY(
|
|
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if evp_pkey != self._ffi.NULL:
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
return self._evp_pkey_to_public_key(evp_pkey)
|
|
else:
|
|
# It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
|
|
# need to check to see if it is a pure PKCS1 RSA public key (not
|
|
# embedded in a subjectPublicKeyInfo)
|
|
self._consume_errors()
|
|
res = self._lib.BIO_reset(mem_bio.bio)
|
|
self.openssl_assert(res == 1)
|
|
rsa_cdata = self._lib.PEM_read_bio_RSAPublicKey(
|
|
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if rsa_cdata != self._ffi.NULL:
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
|
|
return _RSAPublicKey(self, rsa_cdata, evp_pkey)
|
|
else:
|
|
self._handle_key_loading_error()
|
|
|
|
def load_pem_parameters(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
# only DH is supported currently
|
|
dh_cdata = self._lib.PEM_read_bio_DHparams(
|
|
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if dh_cdata != self._ffi.NULL:
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
return _DHParameters(self, dh_cdata)
|
|
else:
|
|
self._handle_key_loading_error()
|
|
|
|
def load_der_private_key(self, data, password):
|
|
# OpenSSL has a function called d2i_AutoPrivateKey that in theory
|
|
# handles this automatically, however it doesn't handle encrypted
|
|
# private keys. Instead we try to load the key two different ways.
|
|
# First we'll try to load it as a traditional key.
|
|
bio_data = self._bytes_to_bio(data)
|
|
key = self._evp_pkey_from_der_traditional_key(bio_data, password)
|
|
if key:
|
|
return self._evp_pkey_to_private_key(key)
|
|
else:
|
|
# Finally we try to load it with the method that handles encrypted
|
|
# PKCS8 properly.
|
|
return self._load_key(
|
|
self._lib.d2i_PKCS8PrivateKey_bio,
|
|
self._evp_pkey_to_private_key,
|
|
data,
|
|
password,
|
|
)
|
|
|
|
def _evp_pkey_from_der_traditional_key(self, bio_data, password):
|
|
key = self._lib.d2i_PrivateKey_bio(bio_data.bio, self._ffi.NULL)
|
|
if key != self._ffi.NULL:
|
|
key = self._ffi.gc(key, self._lib.EVP_PKEY_free)
|
|
if password is not None:
|
|
raise TypeError(
|
|
"Password was given but private key is not encrypted."
|
|
)
|
|
|
|
return key
|
|
else:
|
|
self._consume_errors()
|
|
return None
|
|
|
|
def load_der_public_key(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
evp_pkey = self._lib.d2i_PUBKEY_bio(mem_bio.bio, self._ffi.NULL)
|
|
if evp_pkey != self._ffi.NULL:
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
return self._evp_pkey_to_public_key(evp_pkey)
|
|
else:
|
|
# It's not a (RSA/DSA/ECDSA) subjectPublicKeyInfo, but we still
|
|
# need to check to see if it is a pure PKCS1 RSA public key (not
|
|
# embedded in a subjectPublicKeyInfo)
|
|
self._consume_errors()
|
|
res = self._lib.BIO_reset(mem_bio.bio)
|
|
self.openssl_assert(res == 1)
|
|
rsa_cdata = self._lib.d2i_RSAPublicKey_bio(
|
|
mem_bio.bio, self._ffi.NULL
|
|
)
|
|
if rsa_cdata != self._ffi.NULL:
|
|
rsa_cdata = self._ffi.gc(rsa_cdata, self._lib.RSA_free)
|
|
evp_pkey = self._rsa_cdata_to_evp_pkey(rsa_cdata)
|
|
return _RSAPublicKey(self, rsa_cdata, evp_pkey)
|
|
else:
|
|
self._handle_key_loading_error()
|
|
|
|
def load_der_parameters(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
dh_cdata = self._lib.d2i_DHparams_bio(mem_bio.bio, self._ffi.NULL)
|
|
if dh_cdata != self._ffi.NULL:
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
return _DHParameters(self, dh_cdata)
|
|
elif self._lib.Cryptography_HAS_EVP_PKEY_DHX:
|
|
# We check to see if the is dhx.
|
|
self._consume_errors()
|
|
res = self._lib.BIO_reset(mem_bio.bio)
|
|
self.openssl_assert(res == 1)
|
|
dh_cdata = self._lib.Cryptography_d2i_DHxparams_bio(
|
|
mem_bio.bio, self._ffi.NULL
|
|
)
|
|
if dh_cdata != self._ffi.NULL:
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
return _DHParameters(self, dh_cdata)
|
|
|
|
self._handle_key_loading_error()
|
|
|
|
def load_pem_x509_certificate(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
x509 = self._lib.PEM_read_bio_X509(
|
|
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if x509 == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError(
|
|
"Unable to load certificate. See https://cryptography.io/en/"
|
|
"latest/faq.html#why-can-t-i-import-my-pem-file for more"
|
|
" details."
|
|
)
|
|
|
|
x509 = self._ffi.gc(x509, self._lib.X509_free)
|
|
return _Certificate(self, x509)
|
|
|
|
def load_der_x509_certificate(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
x509 = self._lib.d2i_X509_bio(mem_bio.bio, self._ffi.NULL)
|
|
if x509 == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to load certificate")
|
|
|
|
x509 = self._ffi.gc(x509, self._lib.X509_free)
|
|
return _Certificate(self, x509)
|
|
|
|
def load_pem_x509_crl(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
x509_crl = self._lib.PEM_read_bio_X509_CRL(
|
|
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if x509_crl == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError(
|
|
"Unable to load CRL. See https://cryptography.io/en/la"
|
|
"test/faq.html#why-can-t-i-import-my-pem-file for more"
|
|
" details."
|
|
)
|
|
|
|
x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
|
|
return _CertificateRevocationList(self, x509_crl)
|
|
|
|
def load_der_x509_crl(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
x509_crl = self._lib.d2i_X509_CRL_bio(mem_bio.bio, self._ffi.NULL)
|
|
if x509_crl == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to load CRL")
|
|
|
|
x509_crl = self._ffi.gc(x509_crl, self._lib.X509_CRL_free)
|
|
return _CertificateRevocationList(self, x509_crl)
|
|
|
|
def load_pem_x509_csr(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
x509_req = self._lib.PEM_read_bio_X509_REQ(
|
|
mem_bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if x509_req == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError(
|
|
"Unable to load request. See https://cryptography.io/en/"
|
|
"latest/faq.html#why-can-t-i-import-my-pem-file for more"
|
|
" details."
|
|
)
|
|
|
|
x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
|
|
return _CertificateSigningRequest(self, x509_req)
|
|
|
|
def load_der_x509_csr(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
x509_req = self._lib.d2i_X509_REQ_bio(mem_bio.bio, self._ffi.NULL)
|
|
if x509_req == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to load request")
|
|
|
|
x509_req = self._ffi.gc(x509_req, self._lib.X509_REQ_free)
|
|
return _CertificateSigningRequest(self, x509_req)
|
|
|
|
def _load_key(self, openssl_read_func, convert_func, data, password):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
|
|
userdata = self._ffi.new("CRYPTOGRAPHY_PASSWORD_DATA *")
|
|
if password is not None:
|
|
utils._check_byteslike("password", password)
|
|
password_ptr = self._ffi.from_buffer(password)
|
|
userdata.password = password_ptr
|
|
userdata.length = len(password)
|
|
|
|
evp_pkey = openssl_read_func(
|
|
mem_bio.bio,
|
|
self._ffi.NULL,
|
|
self._ffi.addressof(
|
|
self._lib._original_lib, "Cryptography_pem_password_cb"
|
|
),
|
|
userdata,
|
|
)
|
|
|
|
if evp_pkey == self._ffi.NULL:
|
|
if userdata.error != 0:
|
|
self._consume_errors()
|
|
if userdata.error == -1:
|
|
raise TypeError(
|
|
"Password was not given but private key is encrypted"
|
|
)
|
|
else:
|
|
assert userdata.error == -2
|
|
raise ValueError(
|
|
"Passwords longer than {} bytes are not supported "
|
|
"by this backend.".format(userdata.maxsize - 1)
|
|
)
|
|
else:
|
|
self._handle_key_loading_error()
|
|
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
|
|
if password is not None and userdata.called == 0:
|
|
raise TypeError(
|
|
"Password was given but private key is not encrypted."
|
|
)
|
|
|
|
assert (
|
|
password is not None and userdata.called == 1
|
|
) or password is None
|
|
|
|
return convert_func(evp_pkey)
|
|
|
|
def _handle_key_loading_error(self):
|
|
errors = self._consume_errors()
|
|
|
|
if not errors:
|
|
raise ValueError(
|
|
"Could not deserialize key data. The data may be in an "
|
|
"incorrect format or it may be encrypted with an unsupported "
|
|
"algorithm."
|
|
)
|
|
elif errors[0]._lib_reason_match(
|
|
self._lib.ERR_LIB_EVP, self._lib.EVP_R_BAD_DECRYPT
|
|
) or errors[0]._lib_reason_match(
|
|
self._lib.ERR_LIB_PKCS12,
|
|
self._lib.PKCS12_R_PKCS12_CIPHERFINAL_ERROR,
|
|
):
|
|
raise ValueError("Bad decrypt. Incorrect password?")
|
|
|
|
elif any(
|
|
error._lib_reason_match(
|
|
self._lib.ERR_LIB_EVP,
|
|
self._lib.EVP_R_UNSUPPORTED_PRIVATE_KEY_ALGORITHM,
|
|
)
|
|
for error in errors
|
|
):
|
|
raise ValueError("Unsupported public key algorithm.")
|
|
|
|
else:
|
|
raise ValueError(
|
|
"Could not deserialize key data. The data may be in an "
|
|
"incorrect format or it may be encrypted with an unsupported "
|
|
"algorithm."
|
|
)
|
|
|
|
def elliptic_curve_supported(self, curve):
|
|
try:
|
|
curve_nid = self._elliptic_curve_to_nid(curve)
|
|
except UnsupportedAlgorithm:
|
|
curve_nid = self._lib.NID_undef
|
|
|
|
group = self._lib.EC_GROUP_new_by_curve_name(curve_nid)
|
|
|
|
if group == self._ffi.NULL:
|
|
self._consume_errors()
|
|
return False
|
|
else:
|
|
self.openssl_assert(curve_nid != self._lib.NID_undef)
|
|
self._lib.EC_GROUP_free(group)
|
|
return True
|
|
|
|
def elliptic_curve_signature_algorithm_supported(
|
|
self, signature_algorithm, curve
|
|
):
|
|
# We only support ECDSA right now.
|
|
if not isinstance(signature_algorithm, ec.ECDSA):
|
|
return False
|
|
|
|
return self.elliptic_curve_supported(curve)
|
|
|
|
def generate_elliptic_curve_private_key(self, curve):
|
|
"""
|
|
Generate a new private key on the named curve.
|
|
"""
|
|
|
|
if self.elliptic_curve_supported(curve):
|
|
ec_cdata = self._ec_key_new_by_curve(curve)
|
|
|
|
res = self._lib.EC_KEY_generate_key(ec_cdata)
|
|
self.openssl_assert(res == 1)
|
|
|
|
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
|
|
|
|
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
|
|
else:
|
|
raise UnsupportedAlgorithm(
|
|
"Backend object does not support {}.".format(curve.name),
|
|
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
|
|
)
|
|
|
|
def load_elliptic_curve_private_numbers(self, numbers):
|
|
public = numbers.public_numbers
|
|
|
|
ec_cdata = self._ec_key_new_by_curve(public.curve)
|
|
|
|
private_value = self._ffi.gc(
|
|
self._int_to_bn(numbers.private_value), self._lib.BN_clear_free
|
|
)
|
|
res = self._lib.EC_KEY_set_private_key(ec_cdata, private_value)
|
|
self.openssl_assert(res == 1)
|
|
|
|
ec_cdata = self._ec_key_set_public_key_affine_coordinates(
|
|
ec_cdata, public.x, public.y
|
|
)
|
|
|
|
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
|
|
|
|
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
|
|
|
|
def load_elliptic_curve_public_numbers(self, numbers):
|
|
ec_cdata = self._ec_key_new_by_curve(numbers.curve)
|
|
ec_cdata = self._ec_key_set_public_key_affine_coordinates(
|
|
ec_cdata, numbers.x, numbers.y
|
|
)
|
|
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
|
|
|
|
return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
|
|
|
|
def load_elliptic_curve_public_bytes(self, curve, point_bytes):
|
|
ec_cdata = self._ec_key_new_by_curve(curve)
|
|
group = self._lib.EC_KEY_get0_group(ec_cdata)
|
|
self.openssl_assert(group != self._ffi.NULL)
|
|
point = self._lib.EC_POINT_new(group)
|
|
self.openssl_assert(point != self._ffi.NULL)
|
|
point = self._ffi.gc(point, self._lib.EC_POINT_free)
|
|
with self._tmp_bn_ctx() as bn_ctx:
|
|
res = self._lib.EC_POINT_oct2point(
|
|
group, point, point_bytes, len(point_bytes), bn_ctx
|
|
)
|
|
if res != 1:
|
|
self._consume_errors()
|
|
raise ValueError("Invalid public bytes for the given curve")
|
|
|
|
res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
|
|
self.openssl_assert(res == 1)
|
|
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
|
|
return _EllipticCurvePublicKey(self, ec_cdata, evp_pkey)
|
|
|
|
def derive_elliptic_curve_private_key(self, private_value, curve):
|
|
ec_cdata = self._ec_key_new_by_curve(curve)
|
|
|
|
get_func, group = self._ec_key_determine_group_get_func(ec_cdata)
|
|
|
|
point = self._lib.EC_POINT_new(group)
|
|
self.openssl_assert(point != self._ffi.NULL)
|
|
point = self._ffi.gc(point, self._lib.EC_POINT_free)
|
|
|
|
value = self._int_to_bn(private_value)
|
|
value = self._ffi.gc(value, self._lib.BN_clear_free)
|
|
|
|
with self._tmp_bn_ctx() as bn_ctx:
|
|
res = self._lib.EC_POINT_mul(
|
|
group, point, value, self._ffi.NULL, self._ffi.NULL, bn_ctx
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
bn_x = self._lib.BN_CTX_get(bn_ctx)
|
|
bn_y = self._lib.BN_CTX_get(bn_ctx)
|
|
|
|
res = get_func(group, point, bn_x, bn_y, bn_ctx)
|
|
self.openssl_assert(res == 1)
|
|
|
|
res = self._lib.EC_KEY_set_public_key(ec_cdata, point)
|
|
self.openssl_assert(res == 1)
|
|
private = self._int_to_bn(private_value)
|
|
private = self._ffi.gc(private, self._lib.BN_clear_free)
|
|
res = self._lib.EC_KEY_set_private_key(ec_cdata, private)
|
|
self.openssl_assert(res == 1)
|
|
|
|
evp_pkey = self._ec_cdata_to_evp_pkey(ec_cdata)
|
|
|
|
return _EllipticCurvePrivateKey(self, ec_cdata, evp_pkey)
|
|
|
|
def _ec_key_new_by_curve(self, curve):
|
|
curve_nid = self._elliptic_curve_to_nid(curve)
|
|
return self._ec_key_new_by_curve_nid(curve_nid)
|
|
|
|
def _ec_key_new_by_curve_nid(self, curve_nid):
|
|
ec_cdata = self._lib.EC_KEY_new_by_curve_name(curve_nid)
|
|
self.openssl_assert(ec_cdata != self._ffi.NULL)
|
|
return self._ffi.gc(ec_cdata, self._lib.EC_KEY_free)
|
|
|
|
def load_der_ocsp_request(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
request = self._lib.d2i_OCSP_REQUEST_bio(mem_bio.bio, self._ffi.NULL)
|
|
if request == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to load OCSP request")
|
|
|
|
request = self._ffi.gc(request, self._lib.OCSP_REQUEST_free)
|
|
return _OCSPRequest(self, request)
|
|
|
|
def load_der_ocsp_response(self, data):
|
|
mem_bio = self._bytes_to_bio(data)
|
|
response = self._lib.d2i_OCSP_RESPONSE_bio(mem_bio.bio, self._ffi.NULL)
|
|
if response == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to load OCSP response")
|
|
|
|
response = self._ffi.gc(response, self._lib.OCSP_RESPONSE_free)
|
|
return _OCSPResponse(self, response)
|
|
|
|
def create_ocsp_request(self, builder):
|
|
ocsp_req = self._lib.OCSP_REQUEST_new()
|
|
self.openssl_assert(ocsp_req != self._ffi.NULL)
|
|
ocsp_req = self._ffi.gc(ocsp_req, self._lib.OCSP_REQUEST_free)
|
|
cert, issuer, algorithm = builder._request
|
|
evp_md = self._evp_md_non_null_from_algorithm(algorithm)
|
|
certid = self._lib.OCSP_cert_to_id(evp_md, cert._x509, issuer._x509)
|
|
self.openssl_assert(certid != self._ffi.NULL)
|
|
onereq = self._lib.OCSP_request_add0_id(ocsp_req, certid)
|
|
self.openssl_assert(onereq != self._ffi.NULL)
|
|
self._create_x509_extensions(
|
|
extensions=builder._extensions,
|
|
handlers=self._ocsp_request_extension_encode_handlers,
|
|
x509_obj=ocsp_req,
|
|
add_func=self._lib.OCSP_REQUEST_add_ext,
|
|
gc=True,
|
|
)
|
|
return _OCSPRequest(self, ocsp_req)
|
|
|
|
def _create_ocsp_basic_response(self, builder, private_key, algorithm):
|
|
self._x509_check_signature_params(private_key, algorithm)
|
|
|
|
basic = self._lib.OCSP_BASICRESP_new()
|
|
self.openssl_assert(basic != self._ffi.NULL)
|
|
basic = self._ffi.gc(basic, self._lib.OCSP_BASICRESP_free)
|
|
evp_md = self._evp_md_non_null_from_algorithm(
|
|
builder._response._algorithm
|
|
)
|
|
certid = self._lib.OCSP_cert_to_id(
|
|
evp_md,
|
|
builder._response._cert._x509,
|
|
builder._response._issuer._x509,
|
|
)
|
|
self.openssl_assert(certid != self._ffi.NULL)
|
|
certid = self._ffi.gc(certid, self._lib.OCSP_CERTID_free)
|
|
if builder._response._revocation_reason is None:
|
|
reason = -1
|
|
else:
|
|
reason = _CRL_ENTRY_REASON_ENUM_TO_CODE[
|
|
builder._response._revocation_reason
|
|
]
|
|
if builder._response._revocation_time is None:
|
|
rev_time = self._ffi.NULL
|
|
else:
|
|
rev_time = self._create_asn1_time(
|
|
builder._response._revocation_time
|
|
)
|
|
|
|
next_update = self._ffi.NULL
|
|
if builder._response._next_update is not None:
|
|
next_update = self._create_asn1_time(
|
|
builder._response._next_update
|
|
)
|
|
|
|
this_update = self._create_asn1_time(builder._response._this_update)
|
|
|
|
res = self._lib.OCSP_basic_add1_status(
|
|
basic,
|
|
certid,
|
|
builder._response._cert_status.value,
|
|
reason,
|
|
rev_time,
|
|
this_update,
|
|
next_update,
|
|
)
|
|
self.openssl_assert(res != self._ffi.NULL)
|
|
# okay, now sign the basic structure
|
|
evp_md = self._evp_md_x509_null_if_eddsa(private_key, algorithm)
|
|
responder_cert, responder_encoding = builder._responder_id
|
|
flags = self._lib.OCSP_NOCERTS
|
|
if responder_encoding is ocsp.OCSPResponderEncoding.HASH:
|
|
flags |= self._lib.OCSP_RESPID_KEY
|
|
|
|
if builder._certs is not None:
|
|
for cert in builder._certs:
|
|
res = self._lib.OCSP_basic_add1_cert(basic, cert._x509)
|
|
self.openssl_assert(res == 1)
|
|
|
|
self._create_x509_extensions(
|
|
extensions=builder._extensions,
|
|
handlers=self._ocsp_basicresp_extension_encode_handlers,
|
|
x509_obj=basic,
|
|
add_func=self._lib.OCSP_BASICRESP_add_ext,
|
|
gc=True,
|
|
)
|
|
|
|
res = self._lib.OCSP_basic_sign(
|
|
basic,
|
|
responder_cert._x509,
|
|
private_key._evp_pkey,
|
|
evp_md,
|
|
self._ffi.NULL,
|
|
flags,
|
|
)
|
|
if res != 1:
|
|
errors = self._consume_errors_with_text()
|
|
raise ValueError(
|
|
"Error while signing. responder_cert must be signed "
|
|
"by private_key",
|
|
errors,
|
|
)
|
|
|
|
return basic
|
|
|
|
def create_ocsp_response(
|
|
self, response_status, builder, private_key, algorithm
|
|
):
|
|
if response_status is ocsp.OCSPResponseStatus.SUCCESSFUL:
|
|
basic = self._create_ocsp_basic_response(
|
|
builder, private_key, algorithm
|
|
)
|
|
else:
|
|
basic = self._ffi.NULL
|
|
|
|
ocsp_resp = self._lib.OCSP_response_create(
|
|
response_status.value, basic
|
|
)
|
|
self.openssl_assert(ocsp_resp != self._ffi.NULL)
|
|
ocsp_resp = self._ffi.gc(ocsp_resp, self._lib.OCSP_RESPONSE_free)
|
|
return _OCSPResponse(self, ocsp_resp)
|
|
|
|
def elliptic_curve_exchange_algorithm_supported(self, algorithm, curve):
|
|
return self.elliptic_curve_supported(curve) and isinstance(
|
|
algorithm, ec.ECDH
|
|
)
|
|
|
|
def _ec_cdata_to_evp_pkey(self, ec_cdata):
|
|
evp_pkey = self._create_evp_pkey_gc()
|
|
res = self._lib.EVP_PKEY_set1_EC_KEY(evp_pkey, ec_cdata)
|
|
self.openssl_assert(res == 1)
|
|
return evp_pkey
|
|
|
|
def _elliptic_curve_to_nid(self, curve):
|
|
"""
|
|
Get the NID for a curve name.
|
|
"""
|
|
|
|
curve_aliases = {"secp192r1": "prime192v1", "secp256r1": "prime256v1"}
|
|
|
|
curve_name = curve_aliases.get(curve.name, curve.name)
|
|
|
|
curve_nid = self._lib.OBJ_sn2nid(curve_name.encode())
|
|
if curve_nid == self._lib.NID_undef:
|
|
raise UnsupportedAlgorithm(
|
|
"{} is not a supported elliptic curve".format(curve.name),
|
|
_Reasons.UNSUPPORTED_ELLIPTIC_CURVE,
|
|
)
|
|
return curve_nid
|
|
|
|
@contextmanager
|
|
def _tmp_bn_ctx(self):
|
|
bn_ctx = self._lib.BN_CTX_new()
|
|
self.openssl_assert(bn_ctx != self._ffi.NULL)
|
|
bn_ctx = self._ffi.gc(bn_ctx, self._lib.BN_CTX_free)
|
|
self._lib.BN_CTX_start(bn_ctx)
|
|
try:
|
|
yield bn_ctx
|
|
finally:
|
|
self._lib.BN_CTX_end(bn_ctx)
|
|
|
|
def _ec_key_determine_group_get_func(self, ctx):
|
|
"""
|
|
Given an EC_KEY determine the group and what function is required to
|
|
get point coordinates.
|
|
"""
|
|
self.openssl_assert(ctx != self._ffi.NULL)
|
|
|
|
nid_two_field = self._lib.OBJ_sn2nid(b"characteristic-two-field")
|
|
self.openssl_assert(nid_two_field != self._lib.NID_undef)
|
|
|
|
group = self._lib.EC_KEY_get0_group(ctx)
|
|
self.openssl_assert(group != self._ffi.NULL)
|
|
|
|
method = self._lib.EC_GROUP_method_of(group)
|
|
self.openssl_assert(method != self._ffi.NULL)
|
|
|
|
nid = self._lib.EC_METHOD_get_field_type(method)
|
|
self.openssl_assert(nid != self._lib.NID_undef)
|
|
|
|
if nid == nid_two_field and self._lib.Cryptography_HAS_EC2M:
|
|
get_func = self._lib.EC_POINT_get_affine_coordinates_GF2m
|
|
else:
|
|
get_func = self._lib.EC_POINT_get_affine_coordinates_GFp
|
|
|
|
assert get_func
|
|
|
|
return get_func, group
|
|
|
|
def _ec_key_set_public_key_affine_coordinates(self, ctx, x, y):
|
|
"""
|
|
Sets the public key point in the EC_KEY context to the affine x and y
|
|
values.
|
|
"""
|
|
|
|
if x < 0 or y < 0:
|
|
raise ValueError(
|
|
"Invalid EC key. Both x and y must be non-negative."
|
|
)
|
|
|
|
x = self._ffi.gc(self._int_to_bn(x), self._lib.BN_free)
|
|
y = self._ffi.gc(self._int_to_bn(y), self._lib.BN_free)
|
|
res = self._lib.EC_KEY_set_public_key_affine_coordinates(ctx, x, y)
|
|
if res != 1:
|
|
self._consume_errors()
|
|
raise ValueError("Invalid EC key.")
|
|
|
|
return ctx
|
|
|
|
def _private_key_bytes(
|
|
self, encoding, format, encryption_algorithm, key, evp_pkey, cdata
|
|
):
|
|
# validate argument types
|
|
if not isinstance(encoding, serialization.Encoding):
|
|
raise TypeError("encoding must be an item from the Encoding enum")
|
|
if not isinstance(format, serialization.PrivateFormat):
|
|
raise TypeError(
|
|
"format must be an item from the PrivateFormat enum"
|
|
)
|
|
if not isinstance(
|
|
encryption_algorithm, serialization.KeySerializationEncryption
|
|
):
|
|
raise TypeError(
|
|
"Encryption algorithm must be a KeySerializationEncryption "
|
|
"instance"
|
|
)
|
|
|
|
# validate password
|
|
if isinstance(encryption_algorithm, serialization.NoEncryption):
|
|
password = b""
|
|
elif isinstance(
|
|
encryption_algorithm, serialization.BestAvailableEncryption
|
|
):
|
|
password = encryption_algorithm.password
|
|
if len(password) > 1023:
|
|
raise ValueError(
|
|
"Passwords longer than 1023 bytes are not supported by "
|
|
"this backend"
|
|
)
|
|
else:
|
|
raise ValueError("Unsupported encryption type")
|
|
|
|
# PKCS8 + PEM/DER
|
|
if format is serialization.PrivateFormat.PKCS8:
|
|
if encoding is serialization.Encoding.PEM:
|
|
write_bio = self._lib.PEM_write_bio_PKCS8PrivateKey
|
|
elif encoding is serialization.Encoding.DER:
|
|
write_bio = self._lib.i2d_PKCS8PrivateKey_bio
|
|
else:
|
|
raise ValueError("Unsupported encoding for PKCS8")
|
|
return self._private_key_bytes_via_bio(
|
|
write_bio, evp_pkey, password
|
|
)
|
|
|
|
# TraditionalOpenSSL + PEM/DER
|
|
if format is serialization.PrivateFormat.TraditionalOpenSSL:
|
|
if self._fips_enabled and not isinstance(
|
|
encryption_algorithm, serialization.NoEncryption
|
|
):
|
|
raise ValueError(
|
|
"Encrypted traditional OpenSSL format is not "
|
|
"supported in FIPS mode."
|
|
)
|
|
key_type = self._lib.EVP_PKEY_id(evp_pkey)
|
|
|
|
if encoding is serialization.Encoding.PEM:
|
|
if key_type == self._lib.EVP_PKEY_RSA:
|
|
write_bio = self._lib.PEM_write_bio_RSAPrivateKey
|
|
elif key_type == self._lib.EVP_PKEY_DSA:
|
|
write_bio = self._lib.PEM_write_bio_DSAPrivateKey
|
|
elif key_type == self._lib.EVP_PKEY_EC:
|
|
write_bio = self._lib.PEM_write_bio_ECPrivateKey
|
|
else:
|
|
raise ValueError(
|
|
"Unsupported key type for TraditionalOpenSSL"
|
|
)
|
|
return self._private_key_bytes_via_bio(
|
|
write_bio, cdata, password
|
|
)
|
|
|
|
if encoding is serialization.Encoding.DER:
|
|
if password:
|
|
raise ValueError(
|
|
"Encryption is not supported for DER encoded "
|
|
"traditional OpenSSL keys"
|
|
)
|
|
if key_type == self._lib.EVP_PKEY_RSA:
|
|
write_bio = self._lib.i2d_RSAPrivateKey_bio
|
|
elif key_type == self._lib.EVP_PKEY_EC:
|
|
write_bio = self._lib.i2d_ECPrivateKey_bio
|
|
elif key_type == self._lib.EVP_PKEY_DSA:
|
|
write_bio = self._lib.i2d_DSAPrivateKey_bio
|
|
else:
|
|
raise ValueError(
|
|
"Unsupported key type for TraditionalOpenSSL"
|
|
)
|
|
return self._bio_func_output(write_bio, cdata)
|
|
|
|
raise ValueError("Unsupported encoding for TraditionalOpenSSL")
|
|
|
|
# OpenSSH + PEM
|
|
if format is serialization.PrivateFormat.OpenSSH:
|
|
if encoding is serialization.Encoding.PEM:
|
|
return ssh.serialize_ssh_private_key(key, password)
|
|
|
|
raise ValueError(
|
|
"OpenSSH private key format can only be used"
|
|
" with PEM encoding"
|
|
)
|
|
|
|
# Anything that key-specific code was supposed to handle earlier,
|
|
# like Raw.
|
|
raise ValueError("format is invalid with this key")
|
|
|
|
def _private_key_bytes_via_bio(self, write_bio, evp_pkey, password):
|
|
if not password:
|
|
evp_cipher = self._ffi.NULL
|
|
else:
|
|
# This is a curated value that we will update over time.
|
|
evp_cipher = self._lib.EVP_get_cipherbyname(b"aes-256-cbc")
|
|
|
|
return self._bio_func_output(
|
|
write_bio,
|
|
evp_pkey,
|
|
evp_cipher,
|
|
password,
|
|
len(password),
|
|
self._ffi.NULL,
|
|
self._ffi.NULL,
|
|
)
|
|
|
|
def _bio_func_output(self, write_bio, *args):
|
|
bio = self._create_mem_bio_gc()
|
|
res = write_bio(bio, *args)
|
|
self.openssl_assert(res == 1)
|
|
return self._read_mem_bio(bio)
|
|
|
|
def _public_key_bytes(self, encoding, format, key, evp_pkey, cdata):
|
|
if not isinstance(encoding, serialization.Encoding):
|
|
raise TypeError("encoding must be an item from the Encoding enum")
|
|
if not isinstance(format, serialization.PublicFormat):
|
|
raise TypeError(
|
|
"format must be an item from the PublicFormat enum"
|
|
)
|
|
|
|
# SubjectPublicKeyInfo + PEM/DER
|
|
if format is serialization.PublicFormat.SubjectPublicKeyInfo:
|
|
if encoding is serialization.Encoding.PEM:
|
|
write_bio = self._lib.PEM_write_bio_PUBKEY
|
|
elif encoding is serialization.Encoding.DER:
|
|
write_bio = self._lib.i2d_PUBKEY_bio
|
|
else:
|
|
raise ValueError(
|
|
"SubjectPublicKeyInfo works only with PEM or DER encoding"
|
|
)
|
|
return self._bio_func_output(write_bio, evp_pkey)
|
|
|
|
# PKCS1 + PEM/DER
|
|
if format is serialization.PublicFormat.PKCS1:
|
|
# Only RSA is supported here.
|
|
key_type = self._lib.EVP_PKEY_id(evp_pkey)
|
|
if key_type != self._lib.EVP_PKEY_RSA:
|
|
raise ValueError("PKCS1 format is supported only for RSA keys")
|
|
|
|
if encoding is serialization.Encoding.PEM:
|
|
write_bio = self._lib.PEM_write_bio_RSAPublicKey
|
|
elif encoding is serialization.Encoding.DER:
|
|
write_bio = self._lib.i2d_RSAPublicKey_bio
|
|
else:
|
|
raise ValueError("PKCS1 works only with PEM or DER encoding")
|
|
return self._bio_func_output(write_bio, cdata)
|
|
|
|
# OpenSSH + OpenSSH
|
|
if format is serialization.PublicFormat.OpenSSH:
|
|
if encoding is serialization.Encoding.OpenSSH:
|
|
return ssh.serialize_ssh_public_key(key)
|
|
|
|
raise ValueError(
|
|
"OpenSSH format must be used with OpenSSH encoding"
|
|
)
|
|
|
|
# Anything that key-specific code was supposed to handle earlier,
|
|
# like Raw, CompressedPoint, UncompressedPoint
|
|
raise ValueError("format is invalid with this key")
|
|
|
|
def _parameter_bytes(self, encoding, format, cdata):
|
|
if encoding is serialization.Encoding.OpenSSH:
|
|
raise TypeError("OpenSSH encoding is not supported")
|
|
|
|
# Only DH is supported here currently.
|
|
q = self._ffi.new("BIGNUM **")
|
|
self._lib.DH_get0_pqg(cdata, self._ffi.NULL, q, self._ffi.NULL)
|
|
if encoding is serialization.Encoding.PEM:
|
|
if q[0] != self._ffi.NULL:
|
|
write_bio = self._lib.PEM_write_bio_DHxparams
|
|
else:
|
|
write_bio = self._lib.PEM_write_bio_DHparams
|
|
elif encoding is serialization.Encoding.DER:
|
|
if q[0] != self._ffi.NULL:
|
|
write_bio = self._lib.Cryptography_i2d_DHxparams_bio
|
|
else:
|
|
write_bio = self._lib.i2d_DHparams_bio
|
|
else:
|
|
raise TypeError("encoding must be an item from the Encoding enum")
|
|
|
|
bio = self._create_mem_bio_gc()
|
|
res = write_bio(bio, cdata)
|
|
self.openssl_assert(res == 1)
|
|
return self._read_mem_bio(bio)
|
|
|
|
def generate_dh_parameters(self, generator, key_size):
|
|
if key_size < dh._MIN_MODULUS_SIZE:
|
|
raise ValueError(
|
|
"DH key_size must be at least {} bits".format(
|
|
dh._MIN_MODULUS_SIZE
|
|
)
|
|
)
|
|
|
|
if generator not in (2, 5):
|
|
raise ValueError("DH generator must be 2 or 5")
|
|
|
|
dh_param_cdata = self._lib.DH_new()
|
|
self.openssl_assert(dh_param_cdata != self._ffi.NULL)
|
|
dh_param_cdata = self._ffi.gc(dh_param_cdata, self._lib.DH_free)
|
|
|
|
res = self._lib.DH_generate_parameters_ex(
|
|
dh_param_cdata, key_size, generator, self._ffi.NULL
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
|
|
return _DHParameters(self, dh_param_cdata)
|
|
|
|
def _dh_cdata_to_evp_pkey(self, dh_cdata):
|
|
evp_pkey = self._create_evp_pkey_gc()
|
|
res = self._lib.EVP_PKEY_set1_DH(evp_pkey, dh_cdata)
|
|
self.openssl_assert(res == 1)
|
|
return evp_pkey
|
|
|
|
def generate_dh_private_key(self, parameters):
|
|
dh_key_cdata = _dh_params_dup(parameters._dh_cdata, self)
|
|
|
|
res = self._lib.DH_generate_key(dh_key_cdata)
|
|
self.openssl_assert(res == 1)
|
|
|
|
evp_pkey = self._dh_cdata_to_evp_pkey(dh_key_cdata)
|
|
|
|
return _DHPrivateKey(self, dh_key_cdata, evp_pkey)
|
|
|
|
def generate_dh_private_key_and_parameters(self, generator, key_size):
|
|
return self.generate_dh_private_key(
|
|
self.generate_dh_parameters(generator, key_size)
|
|
)
|
|
|
|
def load_dh_private_numbers(self, numbers):
|
|
parameter_numbers = numbers.public_numbers.parameter_numbers
|
|
|
|
dh_cdata = self._lib.DH_new()
|
|
self.openssl_assert(dh_cdata != self._ffi.NULL)
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
|
|
p = self._int_to_bn(parameter_numbers.p)
|
|
g = self._int_to_bn(parameter_numbers.g)
|
|
|
|
if parameter_numbers.q is not None:
|
|
q = self._int_to_bn(parameter_numbers.q)
|
|
else:
|
|
q = self._ffi.NULL
|
|
|
|
pub_key = self._int_to_bn(numbers.public_numbers.y)
|
|
priv_key = self._int_to_bn(numbers.x)
|
|
|
|
res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
|
|
self.openssl_assert(res == 1)
|
|
|
|
res = self._lib.DH_set0_key(dh_cdata, pub_key, priv_key)
|
|
self.openssl_assert(res == 1)
|
|
|
|
codes = self._ffi.new("int[]", 1)
|
|
res = self._lib.Cryptography_DH_check(dh_cdata, codes)
|
|
self.openssl_assert(res == 1)
|
|
|
|
# DH_check will return DH_NOT_SUITABLE_GENERATOR if p % 24 does not
|
|
# equal 11 when the generator is 2 (a quadratic nonresidue).
|
|
# We want to ignore that error because p % 24 == 23 is also fine.
|
|
# Specifically, g is then a quadratic residue. Within the context of
|
|
# Diffie-Hellman this means it can only generate half the possible
|
|
# values. That sounds bad, but quadratic nonresidues leak a bit of
|
|
# the key to the attacker in exchange for having the full key space
|
|
# available. See: https://crypto.stackexchange.com/questions/12961
|
|
if codes[0] != 0 and not (
|
|
parameter_numbers.g == 2
|
|
and codes[0] ^ self._lib.DH_NOT_SUITABLE_GENERATOR == 0
|
|
):
|
|
raise ValueError("DH private numbers did not pass safety checks.")
|
|
|
|
evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
|
|
|
|
return _DHPrivateKey(self, dh_cdata, evp_pkey)
|
|
|
|
def load_dh_public_numbers(self, numbers):
|
|
dh_cdata = self._lib.DH_new()
|
|
self.openssl_assert(dh_cdata != self._ffi.NULL)
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
|
|
parameter_numbers = numbers.parameter_numbers
|
|
|
|
p = self._int_to_bn(parameter_numbers.p)
|
|
g = self._int_to_bn(parameter_numbers.g)
|
|
|
|
if parameter_numbers.q is not None:
|
|
q = self._int_to_bn(parameter_numbers.q)
|
|
else:
|
|
q = self._ffi.NULL
|
|
|
|
pub_key = self._int_to_bn(numbers.y)
|
|
|
|
res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
|
|
self.openssl_assert(res == 1)
|
|
|
|
res = self._lib.DH_set0_key(dh_cdata, pub_key, self._ffi.NULL)
|
|
self.openssl_assert(res == 1)
|
|
|
|
evp_pkey = self._dh_cdata_to_evp_pkey(dh_cdata)
|
|
|
|
return _DHPublicKey(self, dh_cdata, evp_pkey)
|
|
|
|
def load_dh_parameter_numbers(self, numbers):
|
|
dh_cdata = self._lib.DH_new()
|
|
self.openssl_assert(dh_cdata != self._ffi.NULL)
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
|
|
p = self._int_to_bn(numbers.p)
|
|
g = self._int_to_bn(numbers.g)
|
|
|
|
if numbers.q is not None:
|
|
q = self._int_to_bn(numbers.q)
|
|
else:
|
|
q = self._ffi.NULL
|
|
|
|
res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
|
|
self.openssl_assert(res == 1)
|
|
|
|
return _DHParameters(self, dh_cdata)
|
|
|
|
def dh_parameters_supported(self, p, g, q=None):
|
|
dh_cdata = self._lib.DH_new()
|
|
self.openssl_assert(dh_cdata != self._ffi.NULL)
|
|
dh_cdata = self._ffi.gc(dh_cdata, self._lib.DH_free)
|
|
|
|
p = self._int_to_bn(p)
|
|
g = self._int_to_bn(g)
|
|
|
|
if q is not None:
|
|
q = self._int_to_bn(q)
|
|
else:
|
|
q = self._ffi.NULL
|
|
|
|
res = self._lib.DH_set0_pqg(dh_cdata, p, q, g)
|
|
self.openssl_assert(res == 1)
|
|
|
|
codes = self._ffi.new("int[]", 1)
|
|
res = self._lib.Cryptography_DH_check(dh_cdata, codes)
|
|
self.openssl_assert(res == 1)
|
|
|
|
return codes[0] == 0
|
|
|
|
def dh_x942_serialization_supported(self):
|
|
return self._lib.Cryptography_HAS_EVP_PKEY_DHX == 1
|
|
|
|
def x509_name_bytes(self, name):
|
|
x509_name = _encode_name_gc(self, name)
|
|
pp = self._ffi.new("unsigned char **")
|
|
res = self._lib.i2d_X509_NAME(x509_name, pp)
|
|
self.openssl_assert(pp[0] != self._ffi.NULL)
|
|
pp = self._ffi.gc(
|
|
pp, lambda pointer: self._lib.OPENSSL_free(pointer[0])
|
|
)
|
|
self.openssl_assert(res > 0)
|
|
return self._ffi.buffer(pp[0], res)[:]
|
|
|
|
def x25519_load_public_bytes(self, data):
|
|
# When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
|
|
# switch this to EVP_PKEY_new_raw_public_key
|
|
if len(data) != 32:
|
|
raise ValueError("An X25519 public key is 32 bytes long")
|
|
|
|
evp_pkey = self._create_evp_pkey_gc()
|
|
res = self._lib.EVP_PKEY_set_type(evp_pkey, self._lib.NID_X25519)
|
|
self.openssl_assert(res == 1)
|
|
res = self._lib.EVP_PKEY_set1_tls_encodedpoint(
|
|
evp_pkey, data, len(data)
|
|
)
|
|
self.openssl_assert(res == 1)
|
|
return _X25519PublicKey(self, evp_pkey)
|
|
|
|
def x25519_load_private_bytes(self, data):
|
|
# When we drop support for CRYPTOGRAPHY_OPENSSL_LESS_THAN_111 we can
|
|
# switch this to EVP_PKEY_new_raw_private_key and drop the
|
|
# zeroed_bytearray garbage.
|
|
# OpenSSL only has facilities for loading PKCS8 formatted private
|
|
# keys using the algorithm identifiers specified in
|
|
# https://tools.ietf.org/html/draft-ietf-curdle-pkix-09.
|
|
# This is the standard PKCS8 prefix for a 32 byte X25519 key.
|
|
# The form is:
|
|
# 0:d=0 hl=2 l= 46 cons: SEQUENCE
|
|
# 2:d=1 hl=2 l= 1 prim: INTEGER :00
|
|
# 5:d=1 hl=2 l= 5 cons: SEQUENCE
|
|
# 7:d=2 hl=2 l= 3 prim: OBJECT :1.3.101.110
|
|
# 12:d=1 hl=2 l= 34 prim: OCTET STRING (the key)
|
|
# Of course there's a bit more complexity. In reality OCTET STRING
|
|
# contains an OCTET STRING of length 32! So the last two bytes here
|
|
# are \x04\x20, which is an OCTET STRING of length 32.
|
|
if len(data) != 32:
|
|
raise ValueError("An X25519 private key is 32 bytes long")
|
|
|
|
pkcs8_prefix = b'0.\x02\x01\x000\x05\x06\x03+en\x04"\x04 '
|
|
with self._zeroed_bytearray(48) as ba:
|
|
ba[0:16] = pkcs8_prefix
|
|
ba[16:] = data
|
|
bio = self._bytes_to_bio(ba)
|
|
evp_pkey = self._lib.d2i_PrivateKey_bio(bio.bio, self._ffi.NULL)
|
|
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
self.openssl_assert(
|
|
self._lib.EVP_PKEY_id(evp_pkey) == self._lib.EVP_PKEY_X25519
|
|
)
|
|
return _X25519PrivateKey(self, evp_pkey)
|
|
|
|
def _evp_pkey_keygen_gc(self, nid):
|
|
evp_pkey_ctx = self._lib.EVP_PKEY_CTX_new_id(nid, self._ffi.NULL)
|
|
self.openssl_assert(evp_pkey_ctx != self._ffi.NULL)
|
|
evp_pkey_ctx = self._ffi.gc(evp_pkey_ctx, self._lib.EVP_PKEY_CTX_free)
|
|
res = self._lib.EVP_PKEY_keygen_init(evp_pkey_ctx)
|
|
self.openssl_assert(res == 1)
|
|
evp_ppkey = self._ffi.new("EVP_PKEY **")
|
|
res = self._lib.EVP_PKEY_keygen(evp_pkey_ctx, evp_ppkey)
|
|
self.openssl_assert(res == 1)
|
|
self.openssl_assert(evp_ppkey[0] != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_ppkey[0], self._lib.EVP_PKEY_free)
|
|
return evp_pkey
|
|
|
|
def x25519_generate_key(self):
|
|
evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X25519)
|
|
return _X25519PrivateKey(self, evp_pkey)
|
|
|
|
def x25519_supported(self):
|
|
if self._fips_enabled:
|
|
return False
|
|
return not self._lib.CRYPTOGRAPHY_IS_LIBRESSL
|
|
|
|
def x448_load_public_bytes(self, data):
|
|
if len(data) != 56:
|
|
raise ValueError("An X448 public key is 56 bytes long")
|
|
|
|
evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
|
|
self._lib.NID_X448, self._ffi.NULL, data, len(data)
|
|
)
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
return _X448PublicKey(self, evp_pkey)
|
|
|
|
def x448_load_private_bytes(self, data):
|
|
if len(data) != 56:
|
|
raise ValueError("An X448 private key is 56 bytes long")
|
|
|
|
data_ptr = self._ffi.from_buffer(data)
|
|
evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
|
|
self._lib.NID_X448, self._ffi.NULL, data_ptr, len(data)
|
|
)
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
return _X448PrivateKey(self, evp_pkey)
|
|
|
|
def x448_generate_key(self):
|
|
evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_X448)
|
|
return _X448PrivateKey(self, evp_pkey)
|
|
|
|
def x448_supported(self):
|
|
if self._fips_enabled:
|
|
return False
|
|
return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111
|
|
|
|
def ed25519_supported(self):
|
|
if self._fips_enabled:
|
|
return False
|
|
return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
|
|
|
|
def ed25519_load_public_bytes(self, data):
|
|
utils._check_bytes("data", data)
|
|
|
|
if len(data) != ed25519._ED25519_KEY_SIZE:
|
|
raise ValueError("An Ed25519 public key is 32 bytes long")
|
|
|
|
evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
|
|
self._lib.NID_ED25519, self._ffi.NULL, data, len(data)
|
|
)
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
|
|
return _Ed25519PublicKey(self, evp_pkey)
|
|
|
|
def ed25519_load_private_bytes(self, data):
|
|
if len(data) != ed25519._ED25519_KEY_SIZE:
|
|
raise ValueError("An Ed25519 private key is 32 bytes long")
|
|
|
|
utils._check_byteslike("data", data)
|
|
data_ptr = self._ffi.from_buffer(data)
|
|
evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
|
|
self._lib.NID_ED25519, self._ffi.NULL, data_ptr, len(data)
|
|
)
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
|
|
return _Ed25519PrivateKey(self, evp_pkey)
|
|
|
|
def ed25519_generate_key(self):
|
|
evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED25519)
|
|
return _Ed25519PrivateKey(self, evp_pkey)
|
|
|
|
def ed448_supported(self):
|
|
if self._fips_enabled:
|
|
return False
|
|
return not self._lib.CRYPTOGRAPHY_OPENSSL_LESS_THAN_111B
|
|
|
|
def ed448_load_public_bytes(self, data):
|
|
utils._check_bytes("data", data)
|
|
if len(data) != _ED448_KEY_SIZE:
|
|
raise ValueError("An Ed448 public key is 57 bytes long")
|
|
|
|
evp_pkey = self._lib.EVP_PKEY_new_raw_public_key(
|
|
self._lib.NID_ED448, self._ffi.NULL, data, len(data)
|
|
)
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
|
|
return _Ed448PublicKey(self, evp_pkey)
|
|
|
|
def ed448_load_private_bytes(self, data):
|
|
utils._check_byteslike("data", data)
|
|
if len(data) != _ED448_KEY_SIZE:
|
|
raise ValueError("An Ed448 private key is 57 bytes long")
|
|
|
|
data_ptr = self._ffi.from_buffer(data)
|
|
evp_pkey = self._lib.EVP_PKEY_new_raw_private_key(
|
|
self._lib.NID_ED448, self._ffi.NULL, data_ptr, len(data)
|
|
)
|
|
self.openssl_assert(evp_pkey != self._ffi.NULL)
|
|
evp_pkey = self._ffi.gc(evp_pkey, self._lib.EVP_PKEY_free)
|
|
|
|
return _Ed448PrivateKey(self, evp_pkey)
|
|
|
|
def ed448_generate_key(self):
|
|
evp_pkey = self._evp_pkey_keygen_gc(self._lib.NID_ED448)
|
|
return _Ed448PrivateKey(self, evp_pkey)
|
|
|
|
def derive_scrypt(self, key_material, salt, length, n, r, p):
|
|
buf = self._ffi.new("unsigned char[]", length)
|
|
key_material_ptr = self._ffi.from_buffer(key_material)
|
|
res = self._lib.EVP_PBE_scrypt(
|
|
key_material_ptr,
|
|
len(key_material),
|
|
salt,
|
|
len(salt),
|
|
n,
|
|
r,
|
|
p,
|
|
scrypt._MEM_LIMIT,
|
|
buf,
|
|
length,
|
|
)
|
|
if res != 1:
|
|
errors = self._consume_errors_with_text()
|
|
# memory required formula explained here:
|
|
# https://blog.filippo.io/the-scrypt-parameters/
|
|
min_memory = 128 * n * r // (1024 ** 2)
|
|
raise MemoryError(
|
|
"Not enough memory to derive key. These parameters require"
|
|
" {} MB of memory.".format(min_memory),
|
|
errors,
|
|
)
|
|
return self._ffi.buffer(buf)[:]
|
|
|
|
def aead_cipher_supported(self, cipher):
|
|
cipher_name = aead._aead_cipher_name(cipher)
|
|
if self._fips_enabled and cipher_name not in self._fips_aead:
|
|
return False
|
|
return self._lib.EVP_get_cipherbyname(cipher_name) != self._ffi.NULL
|
|
|
|
@contextlib.contextmanager
|
|
def _zeroed_bytearray(self, length):
|
|
"""
|
|
This method creates a bytearray, which we copy data into (hopefully
|
|
also from a mutable buffer that can be dynamically erased!), and then
|
|
zero when we're done.
|
|
"""
|
|
ba = bytearray(length)
|
|
try:
|
|
yield ba
|
|
finally:
|
|
self._zero_data(ba, length)
|
|
|
|
def _zero_data(self, data, length):
|
|
# We clear things this way because at the moment we're not
|
|
# sure of a better way that can guarantee it overwrites the
|
|
# memory of a bytearray and doesn't just replace the underlying char *.
|
|
for i in range(length):
|
|
data[i] = 0
|
|
|
|
@contextlib.contextmanager
|
|
def _zeroed_null_terminated_buf(self, data):
|
|
"""
|
|
This method takes bytes, which can be a bytestring or a mutable
|
|
buffer like a bytearray, and yields a null-terminated version of that
|
|
data. This is required because PKCS12_parse doesn't take a length with
|
|
its password char * and ffi.from_buffer doesn't provide null
|
|
termination. So, to support zeroing the data via bytearray we
|
|
need to build this ridiculous construct that copies the memory, but
|
|
zeroes it after use.
|
|
"""
|
|
if data is None:
|
|
yield self._ffi.NULL
|
|
else:
|
|
data_len = len(data)
|
|
buf = self._ffi.new("char[]", data_len + 1)
|
|
self._ffi.memmove(buf, data, data_len)
|
|
try:
|
|
yield buf
|
|
finally:
|
|
# Cast to a uint8_t * so we can assign by integer
|
|
self._zero_data(self._ffi.cast("uint8_t *", buf), data_len)
|
|
|
|
def load_key_and_certificates_from_pkcs12(self, data, password):
|
|
if password is not None:
|
|
utils._check_byteslike("password", password)
|
|
|
|
bio = self._bytes_to_bio(data)
|
|
p12 = self._lib.d2i_PKCS12_bio(bio.bio, self._ffi.NULL)
|
|
if p12 == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Could not deserialize PKCS12 data")
|
|
|
|
p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
|
|
evp_pkey_ptr = self._ffi.new("EVP_PKEY **")
|
|
x509_ptr = self._ffi.new("X509 **")
|
|
sk_x509_ptr = self._ffi.new("Cryptography_STACK_OF_X509 **")
|
|
with self._zeroed_null_terminated_buf(password) as password_buf:
|
|
res = self._lib.PKCS12_parse(
|
|
p12, password_buf, evp_pkey_ptr, x509_ptr, sk_x509_ptr
|
|
)
|
|
|
|
if res == 0:
|
|
self._consume_errors()
|
|
raise ValueError("Invalid password or PKCS12 data")
|
|
|
|
cert = None
|
|
key = None
|
|
additional_certificates = []
|
|
|
|
if evp_pkey_ptr[0] != self._ffi.NULL:
|
|
evp_pkey = self._ffi.gc(evp_pkey_ptr[0], self._lib.EVP_PKEY_free)
|
|
key = self._evp_pkey_to_private_key(evp_pkey)
|
|
|
|
if x509_ptr[0] != self._ffi.NULL:
|
|
x509 = self._ffi.gc(x509_ptr[0], self._lib.X509_free)
|
|
cert = _Certificate(self, x509)
|
|
|
|
if sk_x509_ptr[0] != self._ffi.NULL:
|
|
sk_x509 = self._ffi.gc(sk_x509_ptr[0], self._lib.sk_X509_free)
|
|
num = self._lib.sk_X509_num(sk_x509_ptr[0])
|
|
for i in range(num):
|
|
x509 = self._lib.sk_X509_value(sk_x509, i)
|
|
self.openssl_assert(x509 != self._ffi.NULL)
|
|
x509 = self._ffi.gc(x509, self._lib.X509_free)
|
|
additional_certificates.append(_Certificate(self, x509))
|
|
|
|
return (key, cert, additional_certificates)
|
|
|
|
def serialize_key_and_certificates_to_pkcs12(
|
|
self, name, key, cert, cas, encryption_algorithm
|
|
):
|
|
password = None
|
|
if name is not None:
|
|
utils._check_bytes("name", name)
|
|
|
|
if isinstance(encryption_algorithm, serialization.NoEncryption):
|
|
nid_cert = -1
|
|
nid_key = -1
|
|
pkcs12_iter = 0
|
|
mac_iter = 0
|
|
elif isinstance(
|
|
encryption_algorithm, serialization.BestAvailableEncryption
|
|
):
|
|
# PKCS12 encryption is hopeless trash and can never be fixed.
|
|
# This is the least terrible option.
|
|
nid_cert = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
|
|
nid_key = self._lib.NID_pbe_WithSHA1And3_Key_TripleDES_CBC
|
|
# At least we can set this higher than OpenSSL's default
|
|
pkcs12_iter = 20000
|
|
# mac_iter chosen for compatibility reasons, see:
|
|
# https://www.openssl.org/docs/man1.1.1/man3/PKCS12_create.html
|
|
# Did we mention how lousy PKCS12 encryption is?
|
|
mac_iter = 1
|
|
password = encryption_algorithm.password
|
|
else:
|
|
raise ValueError("Unsupported key encryption type")
|
|
|
|
if cas is None or len(cas) == 0:
|
|
sk_x509 = self._ffi.NULL
|
|
else:
|
|
sk_x509 = self._lib.sk_X509_new_null()
|
|
sk_x509 = self._ffi.gc(sk_x509, self._lib.sk_X509_free)
|
|
|
|
# reverse the list when building the stack so that they're encoded
|
|
# in the order they were originally provided. it is a mystery
|
|
for ca in reversed(cas):
|
|
res = self._lib.sk_X509_push(sk_x509, ca._x509)
|
|
backend.openssl_assert(res >= 1)
|
|
|
|
with self._zeroed_null_terminated_buf(password) as password_buf:
|
|
with self._zeroed_null_terminated_buf(name) as name_buf:
|
|
p12 = self._lib.PKCS12_create(
|
|
password_buf,
|
|
name_buf,
|
|
key._evp_pkey if key else self._ffi.NULL,
|
|
cert._x509 if cert else self._ffi.NULL,
|
|
sk_x509,
|
|
nid_key,
|
|
nid_cert,
|
|
pkcs12_iter,
|
|
mac_iter,
|
|
0,
|
|
)
|
|
|
|
self.openssl_assert(p12 != self._ffi.NULL)
|
|
p12 = self._ffi.gc(p12, self._lib.PKCS12_free)
|
|
|
|
bio = self._create_mem_bio_gc()
|
|
res = self._lib.i2d_PKCS12_bio(bio, p12)
|
|
self.openssl_assert(res > 0)
|
|
return self._read_mem_bio(bio)
|
|
|
|
def poly1305_supported(self):
|
|
if self._fips_enabled:
|
|
return False
|
|
return self._lib.Cryptography_HAS_POLY1305 == 1
|
|
|
|
def create_poly1305_ctx(self, key):
|
|
utils._check_byteslike("key", key)
|
|
if len(key) != _POLY1305_KEY_SIZE:
|
|
raise ValueError("A poly1305 key is 32 bytes long")
|
|
|
|
return _Poly1305Context(self, key)
|
|
|
|
def load_pem_pkcs7_certificates(self, data):
|
|
utils._check_bytes("data", data)
|
|
bio = self._bytes_to_bio(data)
|
|
p7 = self._lib.PEM_read_bio_PKCS7(
|
|
bio.bio, self._ffi.NULL, self._ffi.NULL, self._ffi.NULL
|
|
)
|
|
if p7 == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to parse PKCS7 data")
|
|
|
|
p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
|
|
return self._load_pkcs7_certificates(p7)
|
|
|
|
def load_der_pkcs7_certificates(self, data):
|
|
utils._check_bytes("data", data)
|
|
bio = self._bytes_to_bio(data)
|
|
p7 = self._lib.d2i_PKCS7_bio(bio.bio, self._ffi.NULL)
|
|
if p7 == self._ffi.NULL:
|
|
self._consume_errors()
|
|
raise ValueError("Unable to parse PKCS7 data")
|
|
|
|
p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
|
|
return self._load_pkcs7_certificates(p7)
|
|
|
|
def _load_pkcs7_certificates(self, p7):
|
|
nid = self._lib.OBJ_obj2nid(p7.type)
|
|
self.openssl_assert(nid != self._lib.NID_undef)
|
|
if nid != self._lib.NID_pkcs7_signed:
|
|
raise UnsupportedAlgorithm(
|
|
"Only basic signed structures are currently supported. NID"
|
|
" for this data was {}".format(nid),
|
|
_Reasons.UNSUPPORTED_SERIALIZATION,
|
|
)
|
|
|
|
sk_x509 = p7.d.sign.cert
|
|
num = self._lib.sk_X509_num(sk_x509)
|
|
certs = []
|
|
for i in range(num):
|
|
x509 = self._lib.sk_X509_value(sk_x509, i)
|
|
self.openssl_assert(x509 != self._ffi.NULL)
|
|
res = self._lib.X509_up_ref(x509)
|
|
# When OpenSSL is less than 1.1.0 up_ref returns the current
|
|
# refcount. On 1.1.0+ it returns 1 for success.
|
|
self.openssl_assert(res >= 1)
|
|
x509 = self._ffi.gc(x509, self._lib.X509_free)
|
|
certs.append(_Certificate(self, x509))
|
|
|
|
return certs
|
|
|
|
def pkcs7_sign(self, builder, encoding, options):
|
|
bio = self._bytes_to_bio(builder._data)
|
|
init_flags = self._lib.PKCS7_PARTIAL
|
|
final_flags = 0
|
|
|
|
if len(builder._additional_certs) == 0:
|
|
certs = self._ffi.NULL
|
|
else:
|
|
certs = self._lib.sk_X509_new_null()
|
|
certs = self._ffi.gc(certs, self._lib.sk_X509_free)
|
|
for cert in builder._additional_certs:
|
|
res = self._lib.sk_X509_push(certs, cert._x509)
|
|
self.openssl_assert(res >= 1)
|
|
|
|
if pkcs7.PKCS7Options.DetachedSignature in options:
|
|
# Don't embed the data in the PKCS7 structure
|
|
init_flags |= self._lib.PKCS7_DETACHED
|
|
final_flags |= self._lib.PKCS7_DETACHED
|
|
|
|
# This just inits a structure for us. However, there
|
|
# are flags we need to set, joy.
|
|
p7 = self._lib.PKCS7_sign(
|
|
self._ffi.NULL,
|
|
self._ffi.NULL,
|
|
certs,
|
|
self._ffi.NULL,
|
|
init_flags,
|
|
)
|
|
self.openssl_assert(p7 != self._ffi.NULL)
|
|
p7 = self._ffi.gc(p7, self._lib.PKCS7_free)
|
|
signer_flags = 0
|
|
# These flags are configurable on a per-signature basis
|
|
# but we've deliberately chosen to make the API only allow
|
|
# setting it across all signatures for now.
|
|
if pkcs7.PKCS7Options.NoCapabilities in options:
|
|
signer_flags |= self._lib.PKCS7_NOSMIMECAP
|
|
elif pkcs7.PKCS7Options.NoAttributes in options:
|
|
signer_flags |= self._lib.PKCS7_NOATTR
|
|
|
|
if pkcs7.PKCS7Options.NoCerts in options:
|
|
signer_flags |= self._lib.PKCS7_NOCERTS
|
|
|
|
for certificate, private_key, hash_algorithm in builder._signers:
|
|
md = self._evp_md_non_null_from_algorithm(hash_algorithm)
|
|
p7signerinfo = self._lib.PKCS7_sign_add_signer(
|
|
p7, certificate._x509, private_key._evp_pkey, md, signer_flags
|
|
)
|
|
self.openssl_assert(p7signerinfo != self._ffi.NULL)
|
|
|
|
for option in options:
|
|
# DetachedSignature, NoCapabilities, and NoAttributes are already
|
|
# handled so we just need to check these last two options.
|
|
if option is pkcs7.PKCS7Options.Text:
|
|
final_flags |= self._lib.PKCS7_TEXT
|
|
elif option is pkcs7.PKCS7Options.Binary:
|
|
final_flags |= self._lib.PKCS7_BINARY
|
|
|
|
bio_out = self._create_mem_bio_gc()
|
|
if encoding is serialization.Encoding.SMIME:
|
|
# This finalizes the structure
|
|
res = self._lib.SMIME_write_PKCS7(
|
|
bio_out, p7, bio.bio, final_flags
|
|
)
|
|
elif encoding is serialization.Encoding.PEM:
|
|
res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
|
|
self.openssl_assert(res == 1)
|
|
res = self._lib.PEM_write_bio_PKCS7_stream(
|
|
bio_out, p7, bio.bio, final_flags
|
|
)
|
|
else:
|
|
assert encoding is serialization.Encoding.DER
|
|
# We need to call finalize here becauase i2d_PKCS7_bio does not
|
|
# finalize.
|
|
res = self._lib.PKCS7_final(p7, bio.bio, final_flags)
|
|
self.openssl_assert(res == 1)
|
|
res = self._lib.i2d_PKCS7_bio(bio_out, p7)
|
|
self.openssl_assert(res == 1)
|
|
return self._read_mem_bio(bio_out)
|
|
|
|
|
|
class GetCipherByName(object):
|
|
def __init__(self, fmt):
|
|
self._fmt = fmt
|
|
|
|
def __call__(self, backend, cipher, mode):
|
|
cipher_name = self._fmt.format(cipher=cipher, mode=mode).lower()
|
|
return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
|
|
|
|
|
|
def _get_xts_cipher(backend, cipher, mode):
|
|
cipher_name = "aes-{}-xts".format(cipher.key_size // 2)
|
|
return backend._lib.EVP_get_cipherbyname(cipher_name.encode("ascii"))
|
|
|
|
|
|
backend = Backend()
|