Skip to content

Commit

Permalink
Configure mypy and fix issues
Browse files Browse the repository at this point in the history
Fixes #2
  • Loading branch information
MatthiasValvekens committed Aug 19, 2022
1 parent 28c8834 commit 6009b8c
Show file tree
Hide file tree
Showing 17 changed files with 95 additions and 53 deletions.
4 changes: 3 additions & 1 deletion certomancer/__main__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from typing import List

from .cli import cli

__all__ = []
__all__: List[str] = []


def launch():
Expand Down
8 changes: 3 additions & 5 deletions certomancer/config_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from collections.abc import Callable

from datetime import timedelta

from typing import Optional, Any, Tuple

__all__ = [
'ConfigurationError', 'ConfigurableMixin', 'check_config_keys',
Expand All @@ -22,8 +22,6 @@
'plugin_instantiate_util'
]

from typing import Optional

_noneType = type(None)


Expand All @@ -40,7 +38,7 @@ class LabelString:
__slots__ = ['value']

@staticmethod
def get_subclass(thing) -> Optional[type]:
def get_subclass(thing) -> Optional[type]: # type: ignore
"""
Figure out if the annotation 'thing' describes a label type.
Used in config ingestion logic to instantiate dataclasses.
Expand All @@ -55,7 +53,7 @@ def get_subclass(thing) -> Optional[type]:
try:
from typing import get_args
except ImportError:
def get_args(tp):
def get_args(tp: Any) -> Tuple[Any, ...]:
try:
return tp.__args__
except AttributeError:
Expand Down
8 changes: 4 additions & 4 deletions certomancer/crypto_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,10 @@ def generic_sign(self, private_key: keys.PrivateKeyInfo, tbs_bytes: bytes,
digest_algorithm = sd_algo.hash_algo
sig_algo = sd_algo.signature_algo
if sig_algo == 'rsassa_pkcs1v15':
padding = padding.PKCS1v15()
asym_padding = padding.PKCS1v15()
hash_algo = getattr(hashes, digest_algorithm.upper())()
assert isinstance(priv_key, rsa.RSAPrivateKey)
return priv_key.sign(tbs_bytes, padding, hash_algo)
return priv_key.sign(tbs_bytes, asym_padding, hash_algo)
elif sig_algo == 'rsassa_pss':
parameters = None
if private_key.algorithm == 'rsassa_pss':
Expand Down Expand Up @@ -225,8 +225,8 @@ def optimal_pss_params(self, key: keys.PublicKeyInfo,
key = key.copy()
key['algorithm'] = {'algorithm': 'rsa'}

loaded_key: rsa.RSAPublicKey \
= serialization.load_der_public_key(key.dump())
loaded_key = serialization.load_der_public_key(key.dump())
assert isinstance(loaded_key, rsa.RSAPublicKey)
md = getattr(hashes, digest_algo.upper())
# the PSS salt calculation function is not in the .pyi file, apparently.
# noinspection PyUnresolvedReferences
Expand Down
5 changes: 4 additions & 1 deletion certomancer/default_plugins.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import binascii
import itertools
from typing import Optional, Any, List
from typing import Optional, Any, List, Iterable, Tuple, Dict

from asn1crypto import x509, core, cms
from asn1crypto.core import ObjectIdentifier
Expand Down Expand Up @@ -221,6 +221,7 @@ def _parse_target(entities, params):

def provision(self, extn_id, arch: 'PKIArchitecture', params):
from ._asn1_types import Target, Targets, SequenceOfTargets
targets: Iterable[Tuple[x509.GeneralName, bool]]
if isinstance(params, list):
targets = (
ACTargetsPlugin._parse_target(arch.entities, t)
Expand Down Expand Up @@ -490,6 +491,8 @@ def extensions_for_self(self, arch: 'PKIArchitecture', profile_params: Any,
"'simple-ca' can only be used on public-key certificates"
)
profile_params = self._normalise_params(profile_params)

bc_value: Dict[str, Any]
bc_value = {'ca': True}
try:
path_len = int(profile_params['max-path-len'])
Expand Down
1 change: 1 addition & 0 deletions certomancer/integrations/alchemist.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ def open_pkcs11_session(lib_location: str,
f'Token in slot {slot_no} is not {token_label}.'
)

kwargs: Dict[str, Any]
kwargs = {'rw': rw}
if pin is not None:
kwargs['so_pin' if as_so else 'user_pin'] = pin
Expand Down
21 changes: 10 additions & 11 deletions certomancer/integrations/animator.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,6 @@ def __init__(self, architectures: AnimatorArchStore,
self.fixed_time = at_time
self.architectures = architectures
self.with_web_ui = with_web_ui
self.url_map = None
self.allow_time_override = allow_time_override

self.url_map = Map(
Expand Down Expand Up @@ -329,10 +328,10 @@ def serve_cert(self, _request: Request, *, label: str, arch: str,
cert_label: Optional[str], use_pem):
mime = 'application/x-pem-file' if use_pem else 'application/pkix-cert'
pki_arch = self.architectures[ArchLabel(arch)]
cert_label = CertLabel(cert_label) if cert_label is not None else None
cert_lbl = CertLabel(cert_label) if cert_label is not None else None

cert = pki_arch.service_registry.get_cert_from_repo(
ServiceLabel(label), cert_label
ServiceLabel(label), cert_lbl
)
if cert is None:
raise NotFound()
Expand Down Expand Up @@ -419,18 +418,18 @@ def serve_plugin(self, request: Request, plugin_label: str, *, label: str,
arch: str):
pki_arch = self.architectures[ArchLabel(arch)]
services = pki_arch.service_registry
plugin_label = PluginLabel(plugin_label)
label = ServiceLabel(label)
plugin_lbl = PluginLabel(plugin_label)
svc_lbl = ServiceLabel(label)
try:
plugin_info = services.get_plugin_info(plugin_label, label)
plugin_info = services.get_plugin_info(plugin_lbl, svc_lbl)
except ConfigurationError:
raise NotFound()

content_type = plugin_info.content_type
req_content = request.stream.read()
try:
response_bytes = services.invoke_plugin(
plugin_label, label, req_content, at_time=self.at_time(request)
plugin_lbl, svc_lbl, req_content, at_time=self.at_time(request)
)
except PluginServiceRequestError as e:
raise BadRequest(e.user_msg)
Expand All @@ -456,14 +455,14 @@ def serve_pfx(self, request: Request, *, arch):
except KeyError:
raise BadRequest()

cert = CertLabel(cert)
cert_label = CertLabel(cert)
if not (pyca_cryptography_present() and
pki_arch.is_subject_key_available(cert)):
pki_arch.is_subject_key_available(cert_label)):
raise NotFound()

pass_bytes = request.form.get('passphrase', '').encode('utf8')
data = pki_arch.package_pkcs12(cert, password=pass_bytes or None)
cd_header = f'attachment; filename="{cert}.pfx"'
data = pki_arch.package_pkcs12(cert_label, password=pass_bytes or None)
cd_header = f'attachment; filename="{cert_label}.pfx"'
return Response(data, mimetype='application/x-pkcs12',
headers={'Content-Disposition': cd_header})

Expand Down
Empty file added certomancer/py.typed
Empty file.
9 changes: 6 additions & 3 deletions certomancer/registry/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,16 @@ def __init__(self, config, key_search_dir: str,
"'pki-architectures' must be present in configuration"
) from e

if config_search_dir is not None:
config_search_dir = SearchDir(config_search_dir)
search_dir = (
SearchDir(config_search_dir)
if config_search_dir is not None
else None
)
self.pki_archs = {
arch.arch_label: arch
for arch in PKIArchitecture.build_architectures(
key_sets, arch_cfgs, external_url_prefix=external_url_prefix,
config_search_dir=config_search_dir
config_search_dir=search_dir
)
}

Expand Down
4 changes: 2 additions & 2 deletions certomancer/registry/issued/attr_cert.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import hashlib
from dataclasses import dataclass
from typing import Optional, List, TYPE_CHECKING
from typing import Optional, List, TYPE_CHECKING, Dict, Any

from asn1crypto import x509, cms, keys

Expand Down Expand Up @@ -85,7 +85,7 @@ def process_entries(cls, config_dict):
pass

def to_asn1(self, arch: 'PKIArchitecture') -> cms.Holder:
result = {}
result: Dict[str, Any] = {}
holder_cert_label = self.cert \
or arch.get_unique_cert_for_entity(self.name)
holder_cert: x509.Certificate = arch.get_cert(holder_cert_label)
Expand Down
12 changes: 9 additions & 3 deletions certomancer/registry/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,23 @@ def _load(self):
@property
def public_key_info(self) -> PublicKeyInfo:
self._load()
return self._key.public
key = self._key
assert key is not None
return key.public

@property
def private_key_info(self) -> Optional[PrivateKeyInfo]:
self._load()
return self._key.private
key = self._key
assert key is not None
return key.private

@property
def key_pair(self) -> AsymKey:
self._load()
return self._key
key = self._key
assert key is not None
return key


class KeySet:
Expand Down
26 changes: 21 additions & 5 deletions certomancer/registry/pki_arch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
from collections import defaultdict
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Optional, List, Dict, Iterable, Tuple
from typing import Optional, List, Dict, Iterable, Tuple, Union, Any
from zipfile import ZipFile

import yaml
from asn1crypto import x509, core, pem, ocsp, crl, cms
import tzlocal
from cryptography.hazmat.primitives._serialization import KeySerializationEncryption

from . import plugin_api
from .common import ArchLabel, ServiceLabel, CertLabel, KeyLabel, EntityLabel, \
Expand Down Expand Up @@ -292,13 +293,15 @@ def build_architecture(cls, arch_label: ArchLabel, cfg: dict,
)

@classmethod
def build_architectures(cls, key_sets: KeySets, cfgs, external_url_prefix,
def build_architectures(cls, key_sets: KeySets,
cfgs: Dict[str, Any],
external_url_prefix: str,
config_search_dir: Optional[SearchDir],
extension_plugins: ExtensionPluginRegistry = None,
service_plugins: 'ServicePluginRegistry' = None):
arch_specs = {}
for arch_label, cfg in cfgs.items():
arch_label = ArchLabel(arch_label)
arch_specs: Dict[ArchLabel, Dict[str, Any]] = {}
for lbl, cfg in cfgs.items():
arch_label = ArchLabel(lbl)
# external config
if isinstance(cfg, str):
if config_search_dir is None:
Expand Down Expand Up @@ -510,6 +513,7 @@ def enumerate_attr_certs_by_issuer(self) \

def enumerate_attr_certs_of_holder(self, holder_name: EntityLabel,
issuer: Optional[EntityLabel] = None):
relevant: Iterable[CertLabel]
# slow, but eh, it'll do
if issuer is None:
relevant = itertools.chain(*self._ac_labels_by_issuer.values())
Expand Down Expand Up @@ -539,6 +543,9 @@ def package_pkcs12(self, cert_label: CertLabel,
pkcs12, load_der_private_key, NoEncryption,
BestAvailableEncryption
)
from cryptography.hazmat.primitives.asymmetric import (
rsa, dsa, ec, ed25519, ed448
)
from cryptography import x509 as pyca_x509
except ImportError as e: # pragma: nocover
raise CertomancerServiceError(
Expand All @@ -559,8 +566,16 @@ def package_pkcs12(self, cert_label: CertLabel,
# convert DER to pyca/cryptography internal objects
cert = pyca_x509.load_der_x509_certificate(cert_der)
key = load_der_private_key(key_der, password=None)
assert isinstance(key, (
rsa.RSAPrivateKey,
dsa.DSAPrivateKey,
ec.EllipticCurvePrivateKey,
ed25519.Ed25519PrivateKey,
ed448.Ed448PrivateKey,
))
chain = [pyca_x509.load_der_x509_certificate(c) for c in chain_der]

encryption_alg: KeySerializationEncryption
if not password:
encryption_alg = NoEncryption()
else:
Expand Down Expand Up @@ -1154,6 +1169,7 @@ def _check_repo_membership(self, repo_info: BaseCertRepoServiceInfo,
cert_label: CertLabel, is_attr=False):
# check if the cert in question actually belongs to the repo
# (i.e. whether it is issued by the right entity)
cert_spec: IssuedItemSpec
if is_attr:
cert_spec = self.pki_arch.get_attr_cert_spec(cert_label)
else:
Expand Down
16 changes: 9 additions & 7 deletions certomancer/registry/plugin_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ class ExtensionPlugin(abc.ABC):
Plugins must be stateless.
"""

schema_label: str = None
extension_type: Type[ObjectIdentifier] = None
schema_label: str
extension_type: Optional[Type[ObjectIdentifier]] = None

def provision(self, extn_id: Optional[ObjectIdentifier],
arch: 'PKIArchitecture', params):
Expand Down Expand Up @@ -152,11 +152,13 @@ def process_value(self, extn_id: str,
f"There is no registered plugin for the schema "
f"'{spec.schema}'."
) from e

extn_oid: Optional[ObjectIdentifier]
if proc.extension_type is not None:
extn_id = proc.extension_type(extn_id)
extn_oid = proc.extension_type(extn_id)
else:
extn_id = None
provisioned_value = proc.provision(extn_id, arch, spec.params)
extn_oid = None
provisioned_value = proc.provision(extn_oid, arch, spec.params)
if isinstance(provisioned_value, core.Asn1Value) and \
not isinstance(provisioned_value, core.ParsableOctetString):
# this allows plugins to keep working with extensions for which
Expand All @@ -169,7 +171,7 @@ def process_value(self, extn_id: str,
class AttributePlugin(abc.ABC):
# FIXME give attribute plugins an API to determine how they want
# to handle multivalued attrs (repeated invocation or in bulk)
schema_label: str = None
schema_label: str

def provision(self, attr_id: Optional[ObjectIdentifier],
arch: 'PKIArchitecture', params):
Expand Down Expand Up @@ -358,7 +360,7 @@ class ServicePlugin(abc.ABC):
course always implement a no-op :meth:`invoke`, and wrap the Animator
WSGI application to intercept requests as necessary.
"""
plugin_label: str = None
plugin_label: str

content_type: str = 'application/octet-stream'
"""
Expand Down
7 changes: 2 additions & 5 deletions certomancer/registry/svc_config/ocsp.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,11 @@ class OCSPResponderServiceInfo(ServiceInfo):
"""

signing_key: Optional[KeyLabel] = None
signing_key: KeyLabel
"""
Key to use to sign the OCSP response.
Will be derived from ``responder_cert`` if not specified.
.. note::
This option exists only to allow invalid OCSP responses to be created.
Will be derived from ``responder_cert`` if not specified in config.
"""

signature_algo: Optional[str] = None
Expand Down
5 changes: 3 additions & 2 deletions certomancer/registry/svc_config/tsa.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class TSAServiceInfo(ServiceInfo):
Label of the signer's certificate.
"""

signing_key: Optional[KeyLabel] = None
signing_key: KeyLabel
"""
Key to sign responses with. Ordinarily derived from :attr:`signing_cert`.
Key to sign responses with. Ordinarily derived from :attr:`signing_cert`
when not specified in config.
"""

signature_algo: Optional[str] = None
Expand Down
Loading

0 comments on commit 6009b8c

Please sign in to comment.