Skip to content

Commit

Permalink
Use PEM parsing provided in pycose library (#218)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivarprudnikov committed Aug 29, 2024
1 parent 0353bfb commit c81408c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 189 deletions.
173 changes: 4 additions & 169 deletions pyscitt/pyscitt/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,27 +44,8 @@
)
from cryptography.x509 import load_der_x509_certificate, load_pem_x509_certificate
from cryptography.x509.oid import NameOID
from pycose.keys.curves import P256, P384, P521, Ed25519
from pycose.keys.ec2 import EC2Key
from pycose.keys.keyparam import (
EC2KpCurve,
EC2KpD,
EC2KpX,
EC2KpY,
OKPKpCurve,
OKPKpD,
OKPKpX,
RSAKpD,
RSAKpDP,
RSAKpDQ,
RSAKpE,
RSAKpN,
RSAKpP,
RSAKpQ,
RSAKpQInv,
)
from pycose.keys.okp import OKPKey
from pycose.keys.rsa import RSAKey
from pycose.keys.cosekey import CoseKey
from pycose.keys.curves import P256, P384, P521
from pycose.messages import Sign1Message

RECOMMENDED_RSA_PUBLIC_EXPONENT = 65537
Expand Down Expand Up @@ -429,153 +410,6 @@ def parse_cose_sign1(buf: bytes) -> Tuple[dict, bytes]:
return header, payload


def pretty_cose_sign1(buf: bytes) -> str:
header, payload = parse_cose_sign1(buf)
try:
claims = json.loads(payload)
except json.JSONDecodeError:
claims = f"<{len(payload)} bytes>"
try:
return (
"COSE_Sign1(\nheader="
+ json.dumps(header, indent=2)
+ "\npayload="
+ json.dumps(claims, indent=2)
+ "\n<signature>)"
)
except TypeError:
print(header)
raise


# temporary, from https://github.com/BrianSipos/pycose/blob/rsa_keys_algs/cose/keys/rsa.py
# until https://github.com/TimothyClaeys/pycose/issues/44 is implemented
def from_cryptography_rsakey_obj(ext_key: Union[RSAPrivateKey, RSAPublicKey]) -> RSAKey:
"""
Returns an initialized COSE Key object of type RSAKey.
:param ext_key: Python cryptography key.
:return: an initialized RSA key
"""

def to_bstr(dec):
blen = (dec.bit_length() + 7) // 8
return dec.to_bytes(blen, byteorder="big")

if hasattr(ext_key, "private_numbers"):
priv_nums = ext_key.private_numbers()
pub_nums = priv_nums.public_numbers
else:
priv_nums = None
pub_nums = ext_key.public_numbers()

cose_key = {}
if pub_nums:
cose_key.update(
{
RSAKpE: to_bstr(pub_nums.e),
RSAKpN: to_bstr(pub_nums.n),
}
)
if priv_nums:
cose_key.update(
{
RSAKpD: to_bstr(priv_nums.d),
RSAKpP: to_bstr(priv_nums.p),
RSAKpQ: to_bstr(priv_nums.q),
RSAKpDP: to_bstr(priv_nums.dmp1),
RSAKpDQ: to_bstr(priv_nums.dmq1),
RSAKpQInv: to_bstr(priv_nums.iqmp),
}
)
return RSAKey.from_dict(cose_key)


def from_cryptography_eckey_obj(
ext_key: Union[EllipticCurvePrivateKey, EllipticCurvePublicKey]
) -> EC2Key:
"""
Returns an initialized COSE Key object of type EC2Key.
:param ext_key: Python cryptography key.
:return: an initialized EC key
"""
if hasattr(ext_key, "private_numbers"):
priv_nums = ext_key.private_numbers()
pub_nums = priv_nums.public_numbers
else:
priv_nums = None
pub_nums = ext_key.public_numbers()

_, curve = cose_curve_from_ec(pub_nums.curve)

cose_key = {}
if pub_nums:
cose_key.update(
{
EC2KpCurve: curve,
EC2KpX: pub_nums.x.to_bytes(curve.size, "big"),
EC2KpY: pub_nums.y.to_bytes(curve.size, "big"),
}
)
if priv_nums:
cose_key.update(
{
EC2KpD: priv_nums.private_value.to_bytes(curve.size, "big"),
}
)
return EC2Key.from_dict(cose_key)


def from_cryptography_ed25519key_obj(
ext_key: Union[Ed25519PrivateKey, Ed25519PublicKey]
) -> OKPKey:
"""
Returns an initialized COSE Key object of type OKPKey.
:param ext_key: Python cryptography key.
:return: an initialized OKP key
"""
if isinstance(ext_key, Ed25519PrivateKey):
priv_bytes = ext_key.private_bytes(
encoding=Encoding.Raw,
format=PrivateFormat.Raw,
encryption_algorithm=NoEncryption(),
)
pub_bytes = ext_key.public_key().public_bytes(
encoding=Encoding.Raw, format=PublicFormat.Raw
)
else:
assert isinstance(ext_key, Ed25519PublicKey)
priv_bytes = None
pub_bytes = ext_key.public_bytes(encoding=Encoding.Raw, format=PublicFormat.Raw)

curve = Ed25519

cose_key = {}
cose_key.update(
{
OKPKpCurve: curve,
OKPKpX: pub_bytes,
}
)
if priv_bytes:
cose_key.update(
{
OKPKpD: priv_bytes,
}
)
return OKPKey.from_dict(cose_key)


def cose_private_key_from_pem(pem: Pem):
key = load_pem_private_key(pem.encode("ascii"), None)
if isinstance(key, RSAPrivateKey):
return from_cryptography_rsakey_obj(key)
elif isinstance(key, EllipticCurvePrivateKey):
return from_cryptography_eckey_obj(key)
elif isinstance(key, Ed25519PrivateKey):
return from_cryptography_ed25519key_obj(key)
raise NotImplementedError("unsupported key type")


def b64url(b: bytes) -> str:
return base64.b64encode(b, altchars=b"-_").decode("ascii")

Expand Down Expand Up @@ -710,6 +544,7 @@ def __init__(
self.x5c = x5c


# TODO: merge with Key Vault signer implementation
def sign_claimset(
signer: Signer,
claims: bytes,
Expand Down Expand Up @@ -748,7 +583,7 @@ def sign_claimset(
headers[COSE_HEADER_PARAM_REGISTRATION_INFO] = registration_info

msg = Sign1Message(phdr=headers, payload=claims)
msg.key = cose_private_key_from_pem(signer.private_key)
msg.key = CoseKey.from_pem_private_key(signer.private_key)
return msg.encode(tag=True)


Expand Down
21 changes: 3 additions & 18 deletions pyscitt/pyscitt/verify.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,11 @@

import cbor2
import pycose
from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
from cryptography.x509 import load_pem_x509_certificate
from pycose.keys.ec2 import EC2Key
from pycose.keys.rsa import RSAKey
from pycose.keys.cosekey import CoseKey
from pycose.messages import Sign1Message

from . import crypto, did
from .crypto import COSE_HEADER_PARAM_ISSUER, COSE_HEADER_PARAM_SCITT_RECEIPTS
from .crypto import COSE_HEADER_PARAM_ISSUER
from .receipt import Receipt


Expand Down Expand Up @@ -63,19 +59,8 @@ def lookup(self, phdr) -> ServiceParameters:


def verify_cose_sign1(buf: bytes, cert_pem: str):
cert = load_pem_x509_certificate(cert_pem.encode("ascii"))
key = cert.public_key()

cose_key: Union[EC2Key, RSAKey]
if isinstance(key, RSAPublicKey):
cose_key = crypto.from_cryptography_rsakey_obj(key)
elif isinstance(key, EllipticCurvePublicKey):
cose_key = crypto.from_cryptography_eckey_obj(key)
else:
raise NotImplementedError("unsupported key type")

msg = Sign1Message.decode(buf)
msg.key = cose_key
msg.key = msg.key = CoseKey.from_pem_public_key(cert_pem)
if not msg.verify_signature():
raise ValueError("signature is invalid")

Expand Down
3 changes: 2 additions & 1 deletion test/test_encoding.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import cbor2
import pycose
import pytest
from pycose.keys.cosekey import CoseKey
from pycose.messages import Sign1Message

from pyscitt import crypto
Expand Down Expand Up @@ -123,7 +124,7 @@ def sign(signer: crypto.Signer, payload: bytes, parameters: dict, *, canonical=T

encoded_headers = cbor_encode(parameters, canonical=canonical)

key = crypto.cose_private_key_from_pem(signer.private_key)
key = CoseKey.from_pem_private_key(signer.private_key)

tbs = cbor_encode(["Signature1", encoded_headers, b"", payload], canonical=True)
signature = algorithm.sign(key, tbs)
Expand Down
3 changes: 2 additions & 1 deletion test/test_x509.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import cbor2
import pycose
import pytest
from pycose.keys.cosekey import CoseKey
from pycose.messages import Sign1Message

from pyscitt import crypto, governance
Expand Down Expand Up @@ -248,7 +249,7 @@ def test_submit_claim_notary_x509(
).encode("utf-8")

msg = Sign1Message(phdr=phdr, uhdr=uhdr, payload=payload)
msg.key = crypto.cose_private_key_from_pem(identity.private_key)
msg.key = CoseKey.from_pem_private_key(identity.private_key)
claim = msg.encode(tag=True)

submission = client.submit_claim_and_confirm(claim)
Expand Down

0 comments on commit c81408c

Please sign in to comment.