diff --git a/changelogs/fragments/139-improve-error-handling.yml b/changelogs/fragments/139-improve-error-handling.yml new file mode 100644 index 000000000..832ed8580 --- /dev/null +++ b/changelogs/fragments/139-improve-error-handling.yml @@ -0,0 +1,2 @@ +bugfixes: + - "support code - improve handling of certificate and certificate signing request (CSR) loading with the ``cryptography`` backend when errors occur (https://github.com/ansible-collections/community.crypto/issues/138, https://github.com/ansible-collections/community.crypto/pull/139)." diff --git a/plugins/module_utils/crypto/support.py b/plugins/module_utils/crypto/support.py index 20b1dde1d..7850c6bb9 100644 --- a/plugins/module_utils/crypto/support.py +++ b/plugins/module_utils/crypto/support.py @@ -131,56 +131,56 @@ def load_privatekey(path, passphrase=None, check_passphrase=True, content=None, priv_key_detail = b_priv_key_fh.read() else: priv_key_detail = content + except (IOError, OSError) as exc: + raise OpenSSLObjectError(exc) - if backend == 'pyopenssl': + if backend == 'pyopenssl': - # First try: try to load with real passphrase (resp. empty string) - # Will work if this is the correct passphrase, or the key is not - # password-protected. + # First try: try to load with real passphrase (resp. empty string) + # Will work if this is the correct passphrase, or the key is not + # password-protected. + try: + result = crypto.load_privatekey(crypto.FILETYPE_PEM, + priv_key_detail, + to_bytes(passphrase or '')) + except crypto.Error as e: + if len(e.args) > 0 and len(e.args[0]) > 0: + if e.args[0][0][2] in ('bad decrypt', 'bad password read'): + # This happens in case we have the wrong passphrase. + if passphrase is not None: + raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key!') + else: + raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!') + raise OpenSSLObjectError('Error while deserializing key: {0}'.format(e)) + if check_passphrase: + # Next we want to make sure that the key is actually protected by + # a passphrase (in case we did try the empty string before, make + # sure that the key is not protected by the empty string) try: - result = crypto.load_privatekey(crypto.FILETYPE_PEM, - priv_key_detail, - to_bytes(passphrase or '')) + crypto.load_privatekey(crypto.FILETYPE_PEM, + priv_key_detail, + to_bytes('y' if passphrase == 'x' else 'x')) + if passphrase is not None: + # Since we can load the key without an exception, the + # key isn't password-protected + raise OpenSSLBadPassphraseError('Passphrase provided, but private key is not password-protected!') except crypto.Error as e: - if len(e.args) > 0 and len(e.args[0]) > 0: + if passphrase is None and len(e.args) > 0 and len(e.args[0]) > 0: if e.args[0][0][2] in ('bad decrypt', 'bad password read'): - # This happens in case we have the wrong passphrase. - if passphrase is not None: - raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key!') - else: - raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!') - raise OpenSSLObjectError('Error while deserializing key: {0}'.format(e)) - if check_passphrase: - # Next we want to make sure that the key is actually protected by - # a passphrase (in case we did try the empty string before, make - # sure that the key is not protected by the empty string) - try: - crypto.load_privatekey(crypto.FILETYPE_PEM, - priv_key_detail, - to_bytes('y' if passphrase == 'x' else 'x')) - if passphrase is not None: - # Since we can load the key without an exception, the - # key isn't password-protected - raise OpenSSLBadPassphraseError('Passphrase provided, but private key is not password-protected!') - except crypto.Error as e: - if passphrase is None and len(e.args) > 0 and len(e.args[0]) > 0: - if e.args[0][0][2] in ('bad decrypt', 'bad password read'): - # The key is obviously protected by the empty string. - # Don't do this at home (if it's possible at all)... - raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!') - elif backend == 'cryptography': - try: - result = load_pem_private_key(priv_key_detail, - None if passphrase is None else to_bytes(passphrase), - cryptography_backend()) - except TypeError: - raise OpenSSLBadPassphraseError('Wrong or empty passphrase provided for private key') - except ValueError: - raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key') + # The key is obviously protected by the empty string. + # Don't do this at home (if it's possible at all)... + raise OpenSSLBadPassphraseError('No passphrase provided, but private key is password-protected!') + elif backend == 'cryptography': + try: + result = load_pem_private_key(priv_key_detail, + None if passphrase is None else to_bytes(passphrase), + cryptography_backend()) + except TypeError: + raise OpenSSLBadPassphraseError('Wrong or empty passphrase provided for private key') + except ValueError: + raise OpenSSLBadPassphraseError('Wrong passphrase provided for private key') - return result - except (IOError, OSError) as exc: - raise OpenSSLObjectError(exc) + return result def load_certificate(path, content=None, backend='pyopenssl'): @@ -192,12 +192,15 @@ def load_certificate(path, content=None, backend='pyopenssl'): cert_content = cert_fh.read() else: cert_content = content - if backend == 'pyopenssl': - return crypto.load_certificate(crypto.FILETYPE_PEM, cert_content) - elif backend == 'cryptography': - return x509.load_pem_x509_certificate(cert_content, cryptography_backend()) except (IOError, OSError) as exc: raise OpenSSLObjectError(exc) + if backend == 'pyopenssl': + return crypto.load_certificate(crypto.FILETYPE_PEM, cert_content) + elif backend == 'cryptography': + try: + return x509.load_pem_x509_certificate(cert_content, cryptography_backend()) + except ValueError as exc: + raise OpenSSLObjectError(exc) def load_certificate_request(path, content=None, backend='pyopenssl'): @@ -213,7 +216,10 @@ def load_certificate_request(path, content=None, backend='pyopenssl'): if backend == 'pyopenssl': return crypto.load_certificate_request(crypto.FILETYPE_PEM, csr_content) elif backend == 'cryptography': - return x509.load_pem_x509_csr(csr_content, cryptography_backend()) + try: + return x509.load_pem_x509_csr(csr_content, cryptography_backend()) + except ValueError as exc: + raise OpenSSLObjectError(exc) def parse_name_field(input_dict):