Skip to content

Commit

Permalink
ACME: improve acme_certificate docs, include cert_id in acme_certific…
Browse files Browse the repository at this point in the history
…ate_renewal_info return value (ansible-collections#747)

* Use community.dns.quote_txt filter instead of regex replace to quote TXT entry value.

* Fix documentation of acme_certificate's challenge_data return value.

* Also return cert_id from acme_certificate_renewal_info module.

* The cert ID cannot be computed if the certificate has no AKI.

This happens with older Pebble versions, which are used when
testing against older ansible-core/-base/Ansible versions.

* Fix AKI extraction for older OpenSSL versions.
  • Loading branch information
felixfontein committed May 4, 2024
1 parent 59606d4 commit 553ab45
Show file tree
Hide file tree
Showing 14 changed files with 323 additions and 115 deletions.
4 changes: 3 additions & 1 deletion plugins/module_utils/acme/acme.py
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ def get_request(self, uri, parse_json_result=True, headers=None, get_only=False,

def get_renewal_info(
self,
cert_id=None,
cert_info=None,
cert_filename=None,
cert_content=None,
Expand All @@ -399,7 +400,8 @@ def get_renewal_info(
if not self.directory.has_renewal_info_endpoint():
raise ModuleFailException('The ACME endpoint does not support ACME Renewal Information retrieval')

cert_id = compute_cert_id(self.backend, cert_info=cert_info, cert_filename=cert_filename, cert_content=cert_content)
if cert_id is None:
cert_id = compute_cert_id(self.backend, cert_info=cert_info, cert_filename=cert_filename, cert_content=cert_content)
url = '{base}{cert_id}'.format(base=self.directory.directory['renewalInfo'], cert_id=cert_id)

data, info = self.get_request(url, parse_json_result=True, fail_on_error=True, get_only=True)
Expand Down
12 changes: 6 additions & 6 deletions plugins/module_utils/acme/backend_openssl_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ def _decode_octets(octets_text):
return binascii.unhexlify(re.sub(r"(\s|:)", "", octets_text).encode("utf-8"))


def _extract_octets(out_text, name, required=True):
match = re.search(
r"\s+%s:\s*\n\s+([A-Fa-f0-9]{2}(?::[A-Fa-f0-9]{2})*)\s*\n" % name,
out_text,
re.MULTILINE | re.DOTALL,
def _extract_octets(out_text, name, required=True, potential_prefixes=None):
regexp = r"\s+%s:\s*\n\s+%s([A-Fa-f0-9]{2}(?::[A-Fa-f0-9]{2})*)\s*\n" % (
name,
('(?:%s)' % '|'.join(re.escape(pp) for pp in potential_prefixes)) if potential_prefixes else '',
)
match = re.search(regexp, out_text, re.MULTILINE | re.DOTALL)
if match is not None:
return _decode_octets(match.group(1))
if not required:
Expand Down Expand Up @@ -379,7 +379,7 @@ def get_cert_information(self, cert_filename=None, cert_content=None):
serial = convert_bytes_to_int(_extract_octets(out_text, 'Serial Number', required=True))

ski = _extract_octets(out_text, 'X509v3 Subject Key Identifier', required=False)
aki = _extract_octets(out_text, 'X509v3 Authority Key Identifier', required=False)
aki = _extract_octets(out_text, 'X509v3 Authority Key Identifier', required=False, potential_prefixes=['keyid:', ''])

return CertificateInformation(
not_valid_after=not_after,
Expand Down
2 changes: 1 addition & 1 deletion plugins/module_utils/acme/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def compute_cert_id(backend, cert_info=None, cert_filename=None, cert_content=No

# Convert Authority Key Identifier to string
if cert_info.authority_key_identifier is None:
raise ModuleFailException('Module has no Authority Key Identifier extension')
raise ModuleFailException('Certificate has no Authority Key Identifier extension')
aki = to_native(base64.urlsafe_b64encode(cert_info.authority_key_identifier)).replace('=', '')

# Convert serial number to string
Expand Down
80 changes: 48 additions & 32 deletions plugins/modules/acme_certificate.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@
# state: present
# wait: true
# # Note: route53 requires TXT entries to be enclosed in quotes
# value: "{{ sample_com_challenge.challenge_data['sample.com']['dns-01'].resource_value | regex_replace('^(.*)$', '\"\\1\"') }}"
# value: "{{ sample_com_challenge.challenge_data['sample.com']['dns-01'].resource_value | community.dns.quote_txt(always_quote=true) }}"
# when: sample_com_challenge is changed and 'sample.com' in sample_com_challenge.challenge_data
#
# Alternative way:
Expand All @@ -419,7 +419,7 @@
# wait: true
# # Note: item.value is a list of TXT entries, and route53
# # requires every entry to be enclosed in quotes
# value: "{{ item.value | map('regex_replace', '^(.*)$', '\"\\1\"' ) | list }}"
# value: "{{ item.value | map('community.dns.quote_txt', always_quote=true) | list }}"
# loop: "{{ sample_com_challenge.challenge_data_dns | dict2items }}"
# when: sample_com_challenge is changed
Expand Down Expand Up @@ -475,39 +475,55 @@
- Per identifier / challenge type challenge data.
- Since Ansible 2.8.5, only challenges which are not yet valid are returned.
returned: changed
type: list
elements: dict
type: dict
contains:
resource:
description: The challenge resource that must be created for validation.
returned: changed
type: str
sample: .well-known/acme-challenge/evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA
resource_original:
identifier:
description:
- The original challenge resource including type identifier for V(tls-alpn-01)
challenges.
returned: changed and O(challenge) is V(tls-alpn-01)
type: str
sample: DNS:example.com
resource_value:
description:
- The value the resource has to produce for the validation.
- For V(http-01) and V(dns-01) challenges, the value can be used as-is.
- "For V(tls-alpn-01) challenges, note that this return value contains a
Base64 encoded version of the correct binary blob which has to be put
into the acmeValidation x509 extension; see
U(https://www.rfc-editor.org/rfc/rfc8737.html#section-3)
for details. To do this, you might need the P(ansible.builtin.b64decode#filter) Jinja filter
to extract the binary blob from this return value."
- For every identifier, provides a dictionary of challenge types mapping to challenge data.
- The keys in this dictionary the identifiers. C(identifier) is a placeholder used in the documentation.
- Note that the keys are not valid Jinja2 identifiers.
returned: changed
type: str
sample: IlirfxKKXA...17Dt3juxGJ-PCt92wr-oA
record:
description: The full DNS record's name for the challenge.
returned: changed and challenge is V(dns-01)
type: str
sample: _acme-challenge.example.com
type: dict
contains:
challenge-type:
description:
- Data for every challenge type.
- The keys in this dictionary the challenge types. C(challenge-type) is a placeholder used in the documentation.
Possible keys are V(http-01), V(dns-01), and V(tls-alpn-01).
- Note that the keys are not valid Jinja2 identifiers.
returned: changed
type: dict
contains:
resource:
description: The challenge resource that must be created for validation.
returned: changed
type: str
sample: .well-known/acme-challenge/evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA
resource_original:
description:
- The original challenge resource including type identifier for V(tls-alpn-01)
challenges.
returned: changed and O(challenge) is V(tls-alpn-01)
type: str
sample: DNS:example.com
resource_value:
description:
- The value the resource has to produce for the validation.
- For V(http-01) and V(dns-01) challenges, the value can be used as-is.
- "For V(tls-alpn-01) challenges, note that this return value contains a
Base64 encoded version of the correct binary blob which has to be put
into the acmeValidation x509 extension; see
U(https://www.rfc-editor.org/rfc/rfc8737.html#section-3)
for details. To do this, you might need the P(ansible.builtin.b64decode#filter) Jinja filter
to extract the binary blob from this return value."
returned: changed
type: str
sample: IlirfxKKXA...17Dt3juxGJ-PCt92wr-oA
record:
description: The full DNS record's name for the challenge.
returned: changed and challenge is V(dns-01)
type: str
sample: _acme-challenge.example.com
challenge_data_dns:
description:
- List of TXT values per DNS record, in case challenge is V(dns-01).
Expand Down
100 changes: 43 additions & 57 deletions plugins/modules/acme_certificate_renewal_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,13 @@
returned: success
type: bool
sample: true
cert_id:
description:
- The certificate ID according to the L(ARI specification, https://www.ietf.org/archive/id/draft-ietf-acme-ari-03.html#section-4.1).
returned: success, the certificate exists, and has an Authority Key Identifier X.509 extension
type: str
sample: aYhba4dGQEHhs3uEe6CuLN4ByNQ.AIdlQyE
'''

import os
Expand All @@ -134,6 +141,8 @@

from ansible_collections.community.crypto.plugins.module_utils.acme.errors import ModuleFailException

from ansible_collections.community.crypto.plugins.module_utils.acme.utils import compute_cert_id


def main():
argument_spec = get_default_argspec(with_account=False)
Expand All @@ -155,109 +164,86 @@ def main():
)
backend = create_backend(module, True)

result = dict(
changed=False,
msg='The certificate is still valid and no condition was reached',
supports_ari=False,
)

def complete(should_renew, **kwargs):
result['should_renew'] = should_renew
result.update(kwargs)
module.exit_json(**result)

if not module.params['certificate_path'] and not module.params['certificate_content']:
module.exit_json(
changed=False,
should_renew=True,
msg='No certificate was specified',
supports_ari=False,
)
complete(True, msg='No certificate was specified')

if module.params['certificate_path'] is not None and not os.path.exists(module.params['certificate_path']):
module.exit_json(
changed=False,
should_renew=True,
msg='The certificate file does not exist',
supports_ari=False,
)
complete(True, msg='The certificate file does not exist')

try:
cert_info = backend.get_cert_information(
cert_filename=module.params['certificate_path'],
cert_content=module.params['certificate_content'],
)
cert_id = None
if cert_info.authority_key_identifier is not None:
cert_id = compute_cert_id(backend, cert_info=cert_info)
if cert_id is not None:
result['cert_id'] = cert_id

if module.params['now']:
now = backend.parse_module_parameter(module.params['now'], 'now')
else:
now = backend.get_now()

no_renewal_msg = 'The certificate is still valid and no condition was reached'
renewal_ari = False

if now >= cert_info.not_valid_after:
module.exit_json(
changed=False,
should_renew=True,
msg='The certificate already expired',
supports_ari=False,
)
complete(True, msg='The certificate has already expired')

client = ACMEClient(module, backend)
if client.directory.has_renewal_info_endpoint():
renewal_info = client.get_renewal_info(cert_info=cert_info)
if cert_id is not None and module.params['use_ari'] and client.directory.has_renewal_info_endpoint():
renewal_info = client.get_renewal_info(cert_id=cert_id)
window_start = backend.parse_acme_timestamp(renewal_info['suggestedWindow']['start'])
window_end = backend.parse_acme_timestamp(renewal_info['suggestedWindow']['end'])
msg_append = ''
if 'explanationURL' in renewal_info:
msg_append = '. Information on renewal interval: {0}'.format(renewal_info['explanationURL'])
renewal_ari = True
result['supports_ari'] = True
if now > window_end:
module.exit_json(
changed=False,
should_renew=True,
msg='The suggested renewal interval provided by ARI is in the past{0}'.format(msg_append),
supports_ari=True,
)
complete(True, msg='The suggested renewal interval provided by ARI is in the past{0}'.format(msg_append))
if module.params['ari_algorithm'] == 'start':
if now > window_start:
module.exit_json(
changed=False,
should_renew=True,
msg='The suggested renewal interval provided by ARI has begun{0}'.format(msg_append),
supports_ari=True,
)
complete(True, msg='The suggested renewal interval provided by ARI has begun{0}'.format(msg_append))
else:
random_time = backend.interpolate_timestamp(window_start, window_end, random.random())
if now > random_time:
module.exit_json(
changed=False,
should_renew=True,
msg='The picked random renewal time {0} in sugested renewal internal provided by ARI is in the past{1}'.format(random_time, msg_append),
supports_ari=True,
complete(
True,
msg='The picked random renewal time {0} in sugested renewal internal provided by ARI is in the past{1}'.format(
random_time,
msg_append,
),
)

# TODO check remaining_days
if module.params['remaining_days'] is not None:
remaining_days = (cert_info.not_valid_after - now).days
if remaining_days < module.params['remaining_days']:
module.exit_json(
changed=False,
should_renew=True,
msg='The certificate expires in {0} days'.format(remaining_days),
supports_ari=False,
)
complete(True, msg='The certificate expires in {0} days'.format(remaining_days))

# TODO check remaining_percentage
if module.params['remaining_percentage'] is not None:
timestamp = backend.interpolate_timestamp(cert_info.not_valid_before, cert_info.not_valid_after, 1 - module.params['remaining_percentage'])
if timestamp < now:
module.exit_json(
changed=False,
should_renew=True,
complete(
True,
msg="The remaining percentage {0}% of the certificate's lifespan was reached on {1}".format(
module.params['remaining_percentage'] * 100,
timestamp,
),
supports_ari=False,
)

module.exit_json(
changed=False,
should_renew=False,
msg=no_renewal_msg,
supports_ari=renewal_ari,
)
complete(False)
except ModuleFailException as e:
e.do_fail(module)

Expand Down
Loading

0 comments on commit 553ab45

Please sign in to comment.