diff --git a/certomancer/__main__.py b/certomancer/__main__.py index fac1cd4..557daa7 100644 --- a/certomancer/__main__.py +++ b/certomancer/__main__.py @@ -1,6 +1,8 @@ +from typing import List + from .cli import cli -__all__ = [] +__all__: List[str] = [] def launch(): diff --git a/certomancer/config_utils.py b/certomancer/config_utils.py index be92b72..3932d9e 100644 --- a/certomancer/config_utils.py +++ b/certomancer/config_utils.py @@ -13,7 +13,7 @@ from collections.abc import Callable from datetime import timedelta - +from typing import Optional, Any, Tuple __all__ = [ 'ConfigurationError', 'ConfigurableMixin', 'check_config_keys', @@ -22,8 +22,6 @@ 'plugin_instantiate_util' ] -from typing import Optional - _noneType = type(None) @@ -40,7 +38,7 @@ class LabelString: __slots__ = ['value'] @staticmethod - def get_subclass(thing) -> Optional[type]: + def get_subclass(thing) -> Optional[type]: # type: ignore """ Figure out if the annotation 'thing' describes a label type. Used in config ingestion logic to instantiate dataclasses. @@ -55,7 +53,7 @@ def get_subclass(thing) -> Optional[type]: try: from typing import get_args except ImportError: - def get_args(tp): + def get_args(tp: Any) -> Tuple[Any, ...]: try: return tp.__args__ except AttributeError: diff --git a/certomancer/crypto_utils.py b/certomancer/crypto_utils.py index 32d6b46..eda5d9e 100644 --- a/certomancer/crypto_utils.py +++ b/certomancer/crypto_utils.py @@ -161,10 +161,10 @@ def generic_sign(self, private_key: keys.PrivateKeyInfo, tbs_bytes: bytes, digest_algorithm = sd_algo.hash_algo sig_algo = sd_algo.signature_algo if sig_algo == 'rsassa_pkcs1v15': - padding = padding.PKCS1v15() + asym_padding = padding.PKCS1v15() hash_algo = getattr(hashes, digest_algorithm.upper())() assert isinstance(priv_key, rsa.RSAPrivateKey) - return priv_key.sign(tbs_bytes, padding, hash_algo) + return priv_key.sign(tbs_bytes, asym_padding, hash_algo) elif sig_algo == 'rsassa_pss': parameters = None if private_key.algorithm == 'rsassa_pss': @@ -225,8 +225,8 @@ def optimal_pss_params(self, key: keys.PublicKeyInfo, key = key.copy() key['algorithm'] = {'algorithm': 'rsa'} - loaded_key: rsa.RSAPublicKey \ - = serialization.load_der_public_key(key.dump()) + loaded_key = serialization.load_der_public_key(key.dump()) + assert isinstance(loaded_key, rsa.RSAPublicKey) md = getattr(hashes, digest_algo.upper()) # the PSS salt calculation function is not in the .pyi file, apparently. # noinspection PyUnresolvedReferences diff --git a/certomancer/default_plugins.py b/certomancer/default_plugins.py index bd371c5..7f056ce 100644 --- a/certomancer/default_plugins.py +++ b/certomancer/default_plugins.py @@ -1,6 +1,6 @@ import binascii import itertools -from typing import Optional, Any, List +from typing import Optional, Any, List, Iterable, Tuple, Dict from asn1crypto import x509, core, cms from asn1crypto.core import ObjectIdentifier @@ -221,6 +221,7 @@ def _parse_target(entities, params): def provision(self, extn_id, arch: 'PKIArchitecture', params): from ._asn1_types import Target, Targets, SequenceOfTargets + targets: Iterable[Tuple[x509.GeneralName, bool]] if isinstance(params, list): targets = ( ACTargetsPlugin._parse_target(arch.entities, t) @@ -490,6 +491,8 @@ def extensions_for_self(self, arch: 'PKIArchitecture', profile_params: Any, "'simple-ca' can only be used on public-key certificates" ) profile_params = self._normalise_params(profile_params) + + bc_value: Dict[str, Any] bc_value = {'ca': True} try: path_len = int(profile_params['max-path-len']) diff --git a/certomancer/integrations/alchemist.py b/certomancer/integrations/alchemist.py index ef0501b..4ff1ef6 100644 --- a/certomancer/integrations/alchemist.py +++ b/certomancer/integrations/alchemist.py @@ -98,6 +98,7 @@ def open_pkcs11_session(lib_location: str, f'Token in slot {slot_no} is not {token_label}.' ) + kwargs: Dict[str, Any] kwargs = {'rw': rw} if pin is not None: kwargs['so_pin' if as_so else 'user_pin'] = pin diff --git a/certomancer/integrations/animator.py b/certomancer/integrations/animator.py index b205a8b..c50b2b1 100644 --- a/certomancer/integrations/animator.py +++ b/certomancer/integrations/animator.py @@ -227,7 +227,6 @@ def __init__(self, architectures: AnimatorArchStore, self.fixed_time = at_time self.architectures = architectures self.with_web_ui = with_web_ui - self.url_map = None self.allow_time_override = allow_time_override self.url_map = Map( @@ -329,10 +328,10 @@ def serve_cert(self, _request: Request, *, label: str, arch: str, cert_label: Optional[str], use_pem): mime = 'application/x-pem-file' if use_pem else 'application/pkix-cert' pki_arch = self.architectures[ArchLabel(arch)] - cert_label = CertLabel(cert_label) if cert_label is not None else None + cert_lbl = CertLabel(cert_label) if cert_label is not None else None cert = pki_arch.service_registry.get_cert_from_repo( - ServiceLabel(label), cert_label + ServiceLabel(label), cert_lbl ) if cert is None: raise NotFound() @@ -419,10 +418,10 @@ def serve_plugin(self, request: Request, plugin_label: str, *, label: str, arch: str): pki_arch = self.architectures[ArchLabel(arch)] services = pki_arch.service_registry - plugin_label = PluginLabel(plugin_label) - label = ServiceLabel(label) + plugin_lbl = PluginLabel(plugin_label) + svc_lbl = ServiceLabel(label) try: - plugin_info = services.get_plugin_info(plugin_label, label) + plugin_info = services.get_plugin_info(plugin_lbl, svc_lbl) except ConfigurationError: raise NotFound() @@ -430,7 +429,7 @@ def serve_plugin(self, request: Request, plugin_label: str, *, label: str, req_content = request.stream.read() try: response_bytes = services.invoke_plugin( - plugin_label, label, req_content, at_time=self.at_time(request) + plugin_lbl, svc_lbl, req_content, at_time=self.at_time(request) ) except PluginServiceRequestError as e: raise BadRequest(e.user_msg) @@ -456,14 +455,14 @@ def serve_pfx(self, request: Request, *, arch): except KeyError: raise BadRequest() - cert = CertLabel(cert) + cert_label = CertLabel(cert) if not (pyca_cryptography_present() and - pki_arch.is_subject_key_available(cert)): + pki_arch.is_subject_key_available(cert_label)): raise NotFound() pass_bytes = request.form.get('passphrase', '').encode('utf8') - data = pki_arch.package_pkcs12(cert, password=pass_bytes or None) - cd_header = f'attachment; filename="{cert}.pfx"' + data = pki_arch.package_pkcs12(cert_label, password=pass_bytes or None) + cd_header = f'attachment; filename="{cert_label}.pfx"' return Response(data, mimetype='application/x-pkcs12', headers={'Content-Disposition': cd_header}) diff --git a/certomancer/py.typed b/certomancer/py.typed new file mode 100644 index 0000000..e69de29 diff --git a/certomancer/registry/config.py b/certomancer/registry/config.py index 3edfbef..05fe4cd 100644 --- a/certomancer/registry/config.py +++ b/certomancer/registry/config.py @@ -106,13 +106,16 @@ def __init__(self, config, key_search_dir: str, "'pki-architectures' must be present in configuration" ) from e - if config_search_dir is not None: - config_search_dir = SearchDir(config_search_dir) + search_dir = ( + SearchDir(config_search_dir) + if config_search_dir is not None + else None + ) self.pki_archs = { arch.arch_label: arch for arch in PKIArchitecture.build_architectures( key_sets, arch_cfgs, external_url_prefix=external_url_prefix, - config_search_dir=config_search_dir + config_search_dir=search_dir ) } diff --git a/certomancer/registry/issued/attr_cert.py b/certomancer/registry/issued/attr_cert.py index 1ca810b..bb6abe6 100644 --- a/certomancer/registry/issued/attr_cert.py +++ b/certomancer/registry/issued/attr_cert.py @@ -1,6 +1,6 @@ import hashlib from dataclasses import dataclass -from typing import Optional, List, TYPE_CHECKING +from typing import Optional, List, TYPE_CHECKING, Dict, Any from asn1crypto import x509, cms, keys @@ -85,7 +85,7 @@ def process_entries(cls, config_dict): pass def to_asn1(self, arch: 'PKIArchitecture') -> cms.Holder: - result = {} + result: Dict[str, Any] = {} holder_cert_label = self.cert \ or arch.get_unique_cert_for_entity(self.name) holder_cert: x509.Certificate = arch.get_cert(holder_cert_label) diff --git a/certomancer/registry/keys.py b/certomancer/registry/keys.py index 9bfa29d..9438e7f 100644 --- a/certomancer/registry/keys.py +++ b/certomancer/registry/keys.py @@ -85,17 +85,23 @@ def _load(self): @property def public_key_info(self) -> PublicKeyInfo: self._load() - return self._key.public + key = self._key + assert key is not None + return key.public @property def private_key_info(self) -> Optional[PrivateKeyInfo]: self._load() - return self._key.private + key = self._key + assert key is not None + return key.private @property def key_pair(self) -> AsymKey: self._load() - return self._key + key = self._key + assert key is not None + return key class KeySet: diff --git a/certomancer/registry/pki_arch.py b/certomancer/registry/pki_arch.py index ef85ef7..d2bd652 100644 --- a/certomancer/registry/pki_arch.py +++ b/certomancer/registry/pki_arch.py @@ -5,12 +5,13 @@ from collections import defaultdict from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import Optional, List, Dict, Iterable, Tuple +from typing import Optional, List, Dict, Iterable, Tuple, Union, Any from zipfile import ZipFile import yaml from asn1crypto import x509, core, pem, ocsp, crl, cms import tzlocal +from cryptography.hazmat.primitives._serialization import KeySerializationEncryption from . import plugin_api from .common import ArchLabel, ServiceLabel, CertLabel, KeyLabel, EntityLabel, \ @@ -292,13 +293,15 @@ def build_architecture(cls, arch_label: ArchLabel, cfg: dict, ) @classmethod - def build_architectures(cls, key_sets: KeySets, cfgs, external_url_prefix, + def build_architectures(cls, key_sets: KeySets, + cfgs: Dict[str, Any], + external_url_prefix: str, config_search_dir: Optional[SearchDir], extension_plugins: ExtensionPluginRegistry = None, service_plugins: 'ServicePluginRegistry' = None): - arch_specs = {} - for arch_label, cfg in cfgs.items(): - arch_label = ArchLabel(arch_label) + arch_specs: Dict[ArchLabel, Dict[str, Any]] = {} + for lbl, cfg in cfgs.items(): + arch_label = ArchLabel(lbl) # external config if isinstance(cfg, str): if config_search_dir is None: @@ -510,6 +513,7 @@ def enumerate_attr_certs_by_issuer(self) \ def enumerate_attr_certs_of_holder(self, holder_name: EntityLabel, issuer: Optional[EntityLabel] = None): + relevant: Iterable[CertLabel] # slow, but eh, it'll do if issuer is None: relevant = itertools.chain(*self._ac_labels_by_issuer.values()) @@ -539,6 +543,9 @@ def package_pkcs12(self, cert_label: CertLabel, pkcs12, load_der_private_key, NoEncryption, BestAvailableEncryption ) + from cryptography.hazmat.primitives.asymmetric import ( + rsa, dsa, ec, ed25519, ed448 + ) from cryptography import x509 as pyca_x509 except ImportError as e: # pragma: nocover raise CertomancerServiceError( @@ -559,8 +566,16 @@ def package_pkcs12(self, cert_label: CertLabel, # convert DER to pyca/cryptography internal objects cert = pyca_x509.load_der_x509_certificate(cert_der) key = load_der_private_key(key_der, password=None) + assert isinstance(key, ( + rsa.RSAPrivateKey, + dsa.DSAPrivateKey, + ec.EllipticCurvePrivateKey, + ed25519.Ed25519PrivateKey, + ed448.Ed448PrivateKey, + )) chain = [pyca_x509.load_der_x509_certificate(c) for c in chain_der] + encryption_alg: KeySerializationEncryption if not password: encryption_alg = NoEncryption() else: @@ -1154,6 +1169,7 @@ def _check_repo_membership(self, repo_info: BaseCertRepoServiceInfo, cert_label: CertLabel, is_attr=False): # check if the cert in question actually belongs to the repo # (i.e. whether it is issued by the right entity) + cert_spec: IssuedItemSpec if is_attr: cert_spec = self.pki_arch.get_attr_cert_spec(cert_label) else: diff --git a/certomancer/registry/plugin_api.py b/certomancer/registry/plugin_api.py index ce0ef10..291f668 100644 --- a/certomancer/registry/plugin_api.py +++ b/certomancer/registry/plugin_api.py @@ -69,8 +69,8 @@ class ExtensionPlugin(abc.ABC): Plugins must be stateless. """ - schema_label: str = None - extension_type: Type[ObjectIdentifier] = None + schema_label: str + extension_type: Optional[Type[ObjectIdentifier]] = None def provision(self, extn_id: Optional[ObjectIdentifier], arch: 'PKIArchitecture', params): @@ -152,11 +152,13 @@ def process_value(self, extn_id: str, f"There is no registered plugin for the schema " f"'{spec.schema}'." ) from e + + extn_oid: Optional[ObjectIdentifier] if proc.extension_type is not None: - extn_id = proc.extension_type(extn_id) + extn_oid = proc.extension_type(extn_id) else: - extn_id = None - provisioned_value = proc.provision(extn_id, arch, spec.params) + extn_oid = None + provisioned_value = proc.provision(extn_oid, arch, spec.params) if isinstance(provisioned_value, core.Asn1Value) and \ not isinstance(provisioned_value, core.ParsableOctetString): # this allows plugins to keep working with extensions for which @@ -169,7 +171,7 @@ def process_value(self, extn_id: str, class AttributePlugin(abc.ABC): # FIXME give attribute plugins an API to determine how they want # to handle multivalued attrs (repeated invocation or in bulk) - schema_label: str = None + schema_label: str def provision(self, attr_id: Optional[ObjectIdentifier], arch: 'PKIArchitecture', params): @@ -358,7 +360,7 @@ class ServicePlugin(abc.ABC): course always implement a no-op :meth:`invoke`, and wrap the Animator WSGI application to intercept requests as necessary. """ - plugin_label: str = None + plugin_label: str content_type: str = 'application/octet-stream' """ diff --git a/certomancer/registry/svc_config/ocsp.py b/certomancer/registry/svc_config/ocsp.py index 0ed9ae6..64f4d63 100644 --- a/certomancer/registry/svc_config/ocsp.py +++ b/certomancer/registry/svc_config/ocsp.py @@ -37,14 +37,11 @@ class OCSPResponderServiceInfo(ServiceInfo): """ - signing_key: Optional[KeyLabel] = None + signing_key: KeyLabel """ Key to use to sign the OCSP response. - Will be derived from ``responder_cert`` if not specified. - - .. note:: - This option exists only to allow invalid OCSP responses to be created. + Will be derived from ``responder_cert`` if not specified in config. """ signature_algo: Optional[str] = None diff --git a/certomancer/registry/svc_config/tsa.py b/certomancer/registry/svc_config/tsa.py index d1e0774..f6ba1f5 100644 --- a/certomancer/registry/svc_config/tsa.py +++ b/certomancer/registry/svc_config/tsa.py @@ -20,9 +20,10 @@ class TSAServiceInfo(ServiceInfo): Label of the signer's certificate. """ - signing_key: Optional[KeyLabel] = None + signing_key: KeyLabel """ - Key to sign responses with. Ordinarily derived from :attr:`signing_cert`. + Key to sign responses with. Ordinarily derived from :attr:`signing_cert` + when not specified in config. """ signature_algo: Optional[str] = None diff --git a/certomancer/services.py b/certomancer/services.py index c8996f8..f69f47a 100644 --- a/certomancer/services.py +++ b/certomancer/services.py @@ -73,7 +73,7 @@ def request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp: 'algorithm': md_algorithm }) dt = self.fixed_dt or datetime.now(tz=tzlocal.get_localzone()) - tst_info = { + tst_info_dict = { 'version': 'v1', 'policy': self.policy, 'message_imprint': message_imprint, @@ -85,9 +85,9 @@ def request_tsa_response(self, req: tsp.TimeStampReq) -> tsp.TimeStampResp: } if isinstance(req['nonce'], core.Integer): - tst_info['nonce'] = req['nonce'] + tst_info_dict['nonce'] = req['nonce'] - tst_info = tsp.TSTInfo(tst_info) + tst_info: tsp.TSTInfo = tsp.TSTInfo(tst_info_dict) tst_info_data = tst_info.dump() message_digest = getattr(hashlib, md_algorithm)(tst_info_data).digest() signing_cert_id = tsp.ESSCertID({ diff --git a/mypy.ini b/mypy.ini new file mode 100644 index 0000000..9f985f2 --- /dev/null +++ b/mypy.ini @@ -0,0 +1,11 @@ +[mypy] + files = certomancer + +[mypy-asn1crypto.*] + ignore_missing_imports = True + +[mypy-pkcs11.*] + ignore_missing_imports = True + +[mypy-oscrypto.*] + ignore_missing_imports = True diff --git a/setup.py b/setup.py index 059bb12..d48cfbf 100644 --- a/setup.py +++ b/setup.py @@ -32,7 +32,10 @@ def get_version(): description='PKI testing tool', long_description=long_description, long_description_content_type='text/markdown', - package_data={'certomancer.integrations': ['animator_templates/*.html']}, + package_data={ + 'certomancer.integrations': ['animator_templates/*.html'], + 'certomancer': ['py.typed'] + }, classifiers=[ 'Development Status :: 4 - Beta',