From c81408cc81b4b25a943d558f24aa86ff9a15126e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ivar=20=28a=C9=AAv=C9=91r=29?= Date: Thu, 29 Aug 2024 15:47:39 +0100 Subject: [PATCH] Use PEM parsing provided in pycose library (#218) --- pyscitt/pyscitt/crypto.py | 173 +------------------------------------- pyscitt/pyscitt/verify.py | 21 +---- test/test_encoding.py | 3 +- test/test_x509.py | 3 +- 4 files changed, 11 insertions(+), 189 deletions(-) diff --git a/pyscitt/pyscitt/crypto.py b/pyscitt/pyscitt/crypto.py index 0e855c61..6c2c519d 100644 --- a/pyscitt/pyscitt/crypto.py +++ b/pyscitt/pyscitt/crypto.py @@ -44,27 +44,8 @@ ) from cryptography.x509 import load_der_x509_certificate, load_pem_x509_certificate from cryptography.x509.oid import NameOID -from pycose.keys.curves import P256, P384, P521, Ed25519 -from pycose.keys.ec2 import EC2Key -from pycose.keys.keyparam import ( - EC2KpCurve, - EC2KpD, - EC2KpX, - EC2KpY, - OKPKpCurve, - OKPKpD, - OKPKpX, - RSAKpD, - RSAKpDP, - RSAKpDQ, - RSAKpE, - RSAKpN, - RSAKpP, - RSAKpQ, - RSAKpQInv, -) -from pycose.keys.okp import OKPKey -from pycose.keys.rsa import RSAKey +from pycose.keys.cosekey import CoseKey +from pycose.keys.curves import P256, P384, P521 from pycose.messages import Sign1Message RECOMMENDED_RSA_PUBLIC_EXPONENT = 65537 @@ -429,153 +410,6 @@ def parse_cose_sign1(buf: bytes) -> Tuple[dict, bytes]: return header, payload -def pretty_cose_sign1(buf: bytes) -> str: - header, payload = parse_cose_sign1(buf) - try: - claims = json.loads(payload) - except json.JSONDecodeError: - claims = f"<{len(payload)} bytes>" - try: - return ( - "COSE_Sign1(\nheader=" - + json.dumps(header, indent=2) - + "\npayload=" - + json.dumps(claims, indent=2) - + "\n)" - ) - except TypeError: - print(header) - raise - - -# temporary, from https://github.com/BrianSipos/pycose/blob/rsa_keys_algs/cose/keys/rsa.py -# until https://github.com/TimothyClaeys/pycose/issues/44 is implemented -def from_cryptography_rsakey_obj(ext_key: Union[RSAPrivateKey, RSAPublicKey]) -> RSAKey: - """ - Returns an initialized COSE Key object of type RSAKey. - :param ext_key: Python cryptography key. - :return: an initialized RSA key - """ - - def to_bstr(dec): - blen = (dec.bit_length() + 7) // 8 - return dec.to_bytes(blen, byteorder="big") - - if hasattr(ext_key, "private_numbers"): - priv_nums = ext_key.private_numbers() - pub_nums = priv_nums.public_numbers - else: - priv_nums = None - pub_nums = ext_key.public_numbers() - - cose_key = {} - if pub_nums: - cose_key.update( - { - RSAKpE: to_bstr(pub_nums.e), - RSAKpN: to_bstr(pub_nums.n), - } - ) - if priv_nums: - cose_key.update( - { - RSAKpD: to_bstr(priv_nums.d), - RSAKpP: to_bstr(priv_nums.p), - RSAKpQ: to_bstr(priv_nums.q), - RSAKpDP: to_bstr(priv_nums.dmp1), - RSAKpDQ: to_bstr(priv_nums.dmq1), - RSAKpQInv: to_bstr(priv_nums.iqmp), - } - ) - return RSAKey.from_dict(cose_key) - - -def from_cryptography_eckey_obj( - ext_key: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey] -) -> EC2Key: - """ - Returns an initialized COSE Key object of type EC2Key. - :param ext_key: Python cryptography key. - :return: an initialized EC key - """ - if hasattr(ext_key, "private_numbers"): - priv_nums = ext_key.private_numbers() - pub_nums = priv_nums.public_numbers - else: - priv_nums = None - pub_nums = ext_key.public_numbers() - - _, curve = cose_curve_from_ec(pub_nums.curve) - - cose_key = {} - if pub_nums: - cose_key.update( - { - EC2KpCurve: curve, - EC2KpX: pub_nums.x.to_bytes(curve.size, "big"), - EC2KpY: pub_nums.y.to_bytes(curve.size, "big"), - } - ) - if priv_nums: - cose_key.update( - { - EC2KpD: priv_nums.private_value.to_bytes(curve.size, "big"), - } - ) - return EC2Key.from_dict(cose_key) - - -def from_cryptography_ed25519key_obj( - ext_key: Union[Ed25519PrivateKey, Ed25519PublicKey] -) -> OKPKey: - """ - Returns an initialized COSE Key object of type OKPKey. - :param ext_key: Python cryptography key. - :return: an initialized OKP key - """ - if isinstance(ext_key, Ed25519PrivateKey): - priv_bytes = ext_key.private_bytes( - encoding=Encoding.Raw, - format=PrivateFormat.Raw, - encryption_algorithm=NoEncryption(), - ) - pub_bytes = ext_key.public_key().public_bytes( - encoding=Encoding.Raw, format=PublicFormat.Raw - ) - else: - assert isinstance(ext_key, Ed25519PublicKey) - priv_bytes = None - pub_bytes = ext_key.public_bytes(encoding=Encoding.Raw, format=PublicFormat.Raw) - - curve = Ed25519 - - cose_key = {} - cose_key.update( - { - OKPKpCurve: curve, - OKPKpX: pub_bytes, - } - ) - if priv_bytes: - cose_key.update( - { - OKPKpD: priv_bytes, - } - ) - return OKPKey.from_dict(cose_key) - - -def cose_private_key_from_pem(pem: Pem): - key = load_pem_private_key(pem.encode("ascii"), None) - if isinstance(key, RSAPrivateKey): - return from_cryptography_rsakey_obj(key) - elif isinstance(key, EllipticCurvePrivateKey): - return from_cryptography_eckey_obj(key) - elif isinstance(key, Ed25519PrivateKey): - return from_cryptography_ed25519key_obj(key) - raise NotImplementedError("unsupported key type") - - def b64url(b: bytes) -> str: return base64.b64encode(b, altchars=b"-_").decode("ascii") @@ -710,6 +544,7 @@ def __init__( self.x5c = x5c +# TODO: merge with Key Vault signer implementation def sign_claimset( signer: Signer, claims: bytes, @@ -748,7 +583,7 @@ def sign_claimset( headers[COSE_HEADER_PARAM_REGISTRATION_INFO] = registration_info msg = Sign1Message(phdr=headers, payload=claims) - msg.key = cose_private_key_from_pem(signer.private_key) + msg.key = CoseKey.from_pem_private_key(signer.private_key) return msg.encode(tag=True) diff --git a/pyscitt/pyscitt/verify.py b/pyscitt/pyscitt/verify.py index bc00e99f..4e7930cc 100644 --- a/pyscitt/pyscitt/verify.py +++ b/pyscitt/pyscitt/verify.py @@ -10,15 +10,11 @@ import cbor2 import pycose -from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey -from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey -from cryptography.x509 import load_pem_x509_certificate -from pycose.keys.ec2 import EC2Key -from pycose.keys.rsa import RSAKey +from pycose.keys.cosekey import CoseKey from pycose.messages import Sign1Message from . import crypto, did -from .crypto import COSE_HEADER_PARAM_ISSUER, COSE_HEADER_PARAM_SCITT_RECEIPTS +from .crypto import COSE_HEADER_PARAM_ISSUER from .receipt import Receipt @@ -63,19 +59,8 @@ def lookup(self, phdr) -> ServiceParameters: def verify_cose_sign1(buf: bytes, cert_pem: str): - cert = load_pem_x509_certificate(cert_pem.encode("ascii")) - key = cert.public_key() - - cose_key: Union[EC2Key, RSAKey] - if isinstance(key, RSAPublicKey): - cose_key = crypto.from_cryptography_rsakey_obj(key) - elif isinstance(key, EllipticCurvePublicKey): - cose_key = crypto.from_cryptography_eckey_obj(key) - else: - raise NotImplementedError("unsupported key type") - msg = Sign1Message.decode(buf) - msg.key = cose_key + msg.key = msg.key = CoseKey.from_pem_public_key(cert_pem) if not msg.verify_signature(): raise ValueError("signature is invalid") diff --git a/test/test_encoding.py b/test/test_encoding.py index 65703b8d..f90eef8e 100644 --- a/test/test_encoding.py +++ b/test/test_encoding.py @@ -7,6 +7,7 @@ import cbor2 import pycose import pytest +from pycose.keys.cosekey import CoseKey from pycose.messages import Sign1Message from pyscitt import crypto @@ -123,7 +124,7 @@ def sign(signer: crypto.Signer, payload: bytes, parameters: dict, *, canonical=T encoded_headers = cbor_encode(parameters, canonical=canonical) - key = crypto.cose_private_key_from_pem(signer.private_key) + key = CoseKey.from_pem_private_key(signer.private_key) tbs = cbor_encode(["Signature1", encoded_headers, b"", payload], canonical=True) signature = algorithm.sign(key, tbs) diff --git a/test/test_x509.py b/test/test_x509.py index 14212caf..319374ca 100644 --- a/test/test_x509.py +++ b/test/test_x509.py @@ -8,6 +8,7 @@ import cbor2 import pycose import pytest +from pycose.keys.cosekey import CoseKey from pycose.messages import Sign1Message from pyscitt import crypto, governance @@ -248,7 +249,7 @@ def test_submit_claim_notary_x509( ).encode("utf-8") msg = Sign1Message(phdr=phdr, uhdr=uhdr, payload=payload) - msg.key = crypto.cose_private_key_from_pem(identity.private_key) + msg.key = CoseKey.from_pem_private_key(identity.private_key) claim = msg.encode(tag=True) submission = client.submit_claim_and_confirm(claim)