Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x509_crl_info: move main code to module_utils to allow easier implementation of diff mode #203

Merged
merged 4 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelogs/fragments/203-x509_crl_info.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
minor_changes:
- "x509_crl_info - refactor module to allow code re-use for diff mode (https://github.com/ansible-collections/community.crypto/pull/203)."
99 changes: 99 additions & 0 deletions plugins/module_utils/crypto/module_backends/crl_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
# -*- coding: utf-8 -*-
#
# Copyright: (c) 2020, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)

from __future__ import absolute_import, division, print_function
__metaclass__ = type


import traceback

from distutils.version import LooseVersion

from ansible.module_utils.basic import missing_required_lib

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_support import (
cryptography_oid_to_name,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.cryptography_crl import (
TIMESTAMP_FORMAT,
cryptography_decode_revoked_certificate,
cryptography_dump_revoked,
cryptography_get_signature_algorithm_oid_from_crl,
)

from ansible_collections.community.crypto.plugins.module_utils.crypto.pem import (
identify_pem_format,
)

# crypto_utils

MINIMAL_CRYPTOGRAPHY_VERSION = '1.2'

CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography import x509
from cryptography.hazmat.backends import default_backend
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True


class CRLInfoRetrieval(object):
def __init__(self, module, content):
# content must be a bytes string
self.module = module
self.content = content

def get_info(self):
self.crl_pem = identify_pem_format(self.content)
try:
if self.crl_pem:
self.crl = x509.load_pem_x509_crl(self.content, default_backend())
else:
self.crl = x509.load_der_x509_crl(self.content, default_backend())
except Exception as e:
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
self.module.fail_json(msg='Error while decoding CRL: {0}'.format(e))

result = {
'changed': False,
'format': 'pem' if self.crl_pem else 'der',
'last_update': None,
'next_update': None,
'digest': None,
'issuer_ordered': None,
'issuer': None,
'revoked_certificates': [],
}

result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT)
result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT)
result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl))
issuer = []
for attribute in self.crl.issuer:
issuer.append([cryptography_oid_to_name(attribute.oid), attribute.value])
result['issuer_ordered'] = issuer
result['issuer'] = {}
for k, v in issuer:
result['issuer'][k] = v
result['revoked_certificates'] = []
for cert in self.crl:
entry = cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(cryptography_dump_revoked(entry))

return result


def get_crl_info(module, content):
if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)

info = CRLInfoRetrieval(module, content)
return info.get_info()
120 changes: 18 additions & 102 deletions plugins/modules/x509_crl_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,8 @@


import base64
import traceback

from distutils.version import LooseVersion

from ansible.module_utils.basic import AnsibleModule, missing_required_lib
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native

from ansible_collections.community.crypto.plugins.module_utils.crypto.basic import (
Expand All @@ -164,99 +161,9 @@
identify_pem_format,
)

felixfontein marked this conversation as resolved.
Show resolved Hide resolved
# crypto_utils

MINIMAL_CRYPTOGRAPHY_VERSION = '1.2'

CRYPTOGRAPHY_IMP_ERR = None
try:
import cryptography
from cryptography import x509
from cryptography.hazmat.backends import default_backend
CRYPTOGRAPHY_VERSION = LooseVersion(cryptography.__version__)
except ImportError:
CRYPTOGRAPHY_IMP_ERR = traceback.format_exc()
CRYPTOGRAPHY_FOUND = False
else:
CRYPTOGRAPHY_FOUND = True


class CRLError(OpenSSLObjectError):
pass


class CRLInfo(OpenSSLObject):
"""The main module implementation."""

def __init__(self, module):
super(CRLInfo, self).__init__(
module.params['path'] or '',
'present',
False,
module.check_mode
)

self.content = module.params['content']

self.module = module

self.crl = None
if self.content is None:
try:
with open(self.path, 'rb') as f:
data = f.read()
except Exception as e:
self.module.fail_json(msg='Error while reading CRL file from disk: {0}'.format(e))
else:
data = self.content.encode('utf-8')
if not identify_pem_format(data):
data = base64.b64decode(self.content)

self.crl_pem = identify_pem_format(data)
try:
if self.crl_pem:
self.crl = x509.load_pem_x509_crl(data, default_backend())
else:
self.crl = x509.load_der_x509_crl(data, default_backend())
except Exception as e:
self.module.fail_json(msg='Error while decoding CRL: {0}'.format(e))

def get_info(self):
result = {
'changed': False,
'format': 'pem' if self.crl_pem else 'der',
'last_update': None,
'next_update': None,
'digest': None,
'issuer_ordered': None,
'issuer': None,
'revoked_certificates': [],
}

result['last_update'] = self.crl.last_update.strftime(TIMESTAMP_FORMAT)
result['next_update'] = self.crl.next_update.strftime(TIMESTAMP_FORMAT)
result['digest'] = cryptography_oid_to_name(cryptography_get_signature_algorithm_oid_from_crl(self.crl))
issuer = []
for attribute in self.crl.issuer:
issuer.append([cryptography_oid_to_name(attribute.oid), attribute.value])
result['issuer_ordered'] = issuer
result['issuer'] = {}
for k, v in issuer:
result['issuer'][k] = v
result['revoked_certificates'] = []
for cert in self.crl:
entry = cryptography_decode_revoked_certificate(cert)
result['revoked_certificates'].append(cryptography_dump_revoked(entry))

return result

def generate(self):
# Empty method because OpenSSLObject wants this
pass

def dump(self):
# Empty method because OpenSSLObject wants this
pass
from ansible_collections.community.crypto.plugins.module_utils.crypto.module_backends.crl_info import (
get_crl_info,
)


def main():
Expand All @@ -274,13 +181,22 @@ def main():
supports_check_mode=True,
)

if not CRYPTOGRAPHY_FOUND:
module.fail_json(msg=missing_required_lib('cryptography >= {0}'.format(MINIMAL_CRYPTOGRAPHY_VERSION)),
exception=CRYPTOGRAPHY_IMP_ERR)
if module.params['content'] is None:
try:
with open(module.params['path'], 'rb') as f:
data = f.read()
except Exception as e:
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
module.fail_json(msg='Error while reading CRL file from disk: {0}'.format(e))
else:
data = module.params['content'].encode('utf-8')
if not identify_pem_format(data):
try:
data = base64.b64decode(module.params['content'])
except Exception as e:
felixfontein marked this conversation as resolved.
Show resolved Hide resolved
module.fail_json(msg='Error while Base64 decoding content: {0}'.format(e))

try:
crl = CRLInfo(module)
result = crl.get_info()
result = get_crl_info(module, data)
module.exit_json(**result)
except OpenSSLObjectError as e:
module.fail_json(msg=to_native(e))
Expand Down
1 change: 1 addition & 0 deletions tests/integration/targets/x509_crl/meta/main.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
dependencies:
- setup_openssl
- setup_pyopenssl # the x509_crl* modules don't need this, but the other modules using during the tests do in some situations