Skip to content

Commit

Permalink
rewrite and split into 2 modules instead
Browse files Browse the repository at this point in the history
  • Loading branch information
MarkusTeufelberger committed Jul 28, 2020
1 parent 08bb551 commit d2f60d2
Show file tree
Hide file tree
Showing 7 changed files with 577 additions and 143 deletions.
216 changes: 77 additions & 139 deletions plugins/modules/openssl_signature.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,44 +11,41 @@
DOCUMENTATION = r'''
---
module: openssl_signature
short_description: Sign and verify data with openssl
short_description: Sign data with openssl
description:
- This module allows one to sign and verify data via a certificate and a private key
- This module allows one to sign data using a private key.
- The module can use the cryptography Python library, or the pyOpenSSL Python
library. By default, it tries to detect which one is available. This can be
overridden with the I(select_crypto_backend) option. Please note that the PyOpenSSL backend
was deprecated in Ansible 2.9 and will be removed in Ansible 2.13.
was deprecated in Ansible 2.9 and will be removed in community.crypto 2.0.0.
requirements:
- Either cryptography >= 1.4 (some key types require newer versions)
- Or pyOpenSSL >= 0.11 (Ed25519 and Ed448 keys are not supported with this backend)
author:
- Patrick Pichler (@aveexy)
- Markus Teufelberger (@MarkusTeufelberger)
options:
action:
description: Action to be executed
type: str
required: true
choices: [ sign, verify ]
privatekey_path:
description:
- The path to the private key to use when signing
- The path to the private key to use when signing.
- Either I(privatekey_path) or I(privatekey_content) must be specified, but not both.
type: path
privatekey_content:
description:
- The content of the private key to use when signing the certificate signing request.
- Either I(privatekey_path) or I(privatekey_content) must be specified, but not both.
type: str
privatekey_passphrase:
description:
- The passphrase for the private key.
- This is required if the private key is password protected.
type: str
path:
description: File to sign/verify
description:
- The file to sign.
- This file will only be read and not modified.
type: path
required: true
certificate:
description: Certificate required for verify action
type: path
signature:
description: Base64 encoded signature required for verify action
type: str
select_crypto_backend:
description:
- Determines which crypto backend to use.
Expand All @@ -65,30 +62,34 @@
DSA and ECDSA keys: C(cryptography) >= 1.5
ed448 and ed25519 keys: C(cryptography) >= 2.6
seealso:
- module: x509_certificate
- module: openssl_signature_info
- module: openssl_privatekey
'''

EXAMPLES = r'''
- name: Sign example file
openssl_signature:
action: sign
community.crypto.openssl_signature:
privatekey_path: private.key
path: /tmp/example_file
register: sig
- name: Verify signature of example file
openssl_signature:
action: verify
certificate: cert.pem
community.crypto.openssl_signature_info:
certificate_path: cert.pem
path: /tmp/example_file
signature: "{{ sig.signature }}"
register: verify
- name: Make sure the signature is valid
assert:
that:
- verify.valid
'''

RETURN = r'''
signature:
description: Base64 encoded signature
returned: changed or success
description: Base64 encoded signature.
returned: success
type: str
'''

Expand Down Expand Up @@ -134,7 +135,6 @@

from ansible_collections.community.crypto.plugins.module_utils.crypto.support import (
OpenSSLObject,
load_certificate,
load_privatekey,
)

Expand All @@ -154,11 +154,11 @@ def __init__(self, module, backend):

self.backend = backend

self.action = module.params['action']
self.signature = module.params['signature']
self.passphrase = module.params['privatekey_passphrase']
self.private_key = module.params['privatekey_path']
self.certificate = module.params['certificate']
self.privatekey_path = module.params['privatekey_path']
self.privatekey_content = module.params['privatekey_content']
if self.privatekey_content is not None:
self.privatekey_content = self.privatekey_content.encode('utf-8')
self.privatekey_passphrase = module.params['privatekey_passphrase']

def generate(self):
# Empty method because OpenSSLObject wants this
Expand All @@ -176,33 +176,22 @@ def __init__(self, module, backend):
super(SignaturePyOpenSSL, self).__init__(module, backend)

def run(self):
try:
result = dict()

result = dict()

try:
with open(self.path, "rb") as f:
_in = f.read()

if self.action == "verify":
_signature = base64.b64decode(self.signature)
certificate = load_certificate(self.certificate, backend=self.backend)

try:
OpenSSL.crypto.verify(certificate, _signature, _in, 'sha256')
except Exception:
self.module.fail_json(
msg="Verification failed"
)

elif self.action == "sign":
private_key = load_privatekey(
self.private_key,
None if self.passphrase is None else to_bytes(self.passphrase),
backend=self.backend
)

out = OpenSSL.crypto.sign(private_key, _in, "sha256")
result['signature'] = base64.b64encode(out)
private_key = load_privatekey(
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
backend=self.backend,
)

signature = OpenSSL.crypto.sign(private_key, _in, "sha256")
result['signature'] = base64.b64encode(signature)
return result
except Exception as e:
raise OpenSSLObjectError(e)
Expand All @@ -224,88 +213,41 @@ def run(self):
with open(self.path, "rb") as f:
_in = f.read()

if self.action == "verify":
_signature = base64.b64decode(self.signature)
public_key = load_certificate(self.certificate, backend=self.backend).public_key()
verified = False

if CRYPTOGRAPHY_HAS_DSA_SIGN:
try:
if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPublicKey):
public_key.verify(_signature, _in, _hash)
verified = True
except cryptography.exceptions.InvalidSignature:
self.module.fail_json(
msg="DSA signature verification failed"
)
if CRYPTOGRAPHY_HAS_EC_SIGN:
try:
if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
public_key.verify(_signature, _in, cryptography.hazmat.primitives.asymmetric.ec.ECDSA(_hash))
verified = True
except cryptography.exceptions.InvalidSignature:
self.module.fail_json(
msg="ECDSA signature verification failed"
)
if CRYPTOGRAPHY_HAS_ED25519_SIGN:
try:
if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PublicKey):
public_key.verify(_signature, _in)
verified = True
except cryptography.exceptions.InvalidSignature:
self.module.fail_json(
msg="Ed25519 signature verification failed"
)
if CRYPTOGRAPHY_HAS_ED448_SIGN:
try:
if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PublicKey):
public_key.verify(_signature, _in)
verified = True
except cryptography.exceptions.InvalidSignature:
self.module.fail_json(
msg="Ed448 signature verification failed"
)
if CRYPTOGRAPHY_HAS_RSA_SIGN:
try:
if isinstance(public_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPublicKey):
public_key.verify(_signature, _in, _padding, _hash)
verified = True
except cryptography.exceptions.InvalidSignature:
self.module.fail_json(
msg="RSA signature verification failed"
)
if not verified:
self.module.fail_json(
msg="Unsupported key type"
)

elif self.action == "sign":
private_key = load_privatekey(
self.private_key,
None if self.passphrase is None else to_bytes(self.passphrase),
backend=self.backend
)
private_key = load_privatekey(
path=self.privatekey_path,
content=self.privatekey_content,
passphrase=self.privatekey_passphrase,
backend=self.backend,
)

if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
out = private_key.sign(_in, _padding, _hash)
signature = None

elif isinstance(private_key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
out = private_key.sign(_in, _hash)
if CRYPTOGRAPHY_HAS_DSA_SIGN:
if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey):
signature = private_key.sign(_in, _hash)

elif isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
out = private_key.sign(_in, cryptography.hazmat.primitives.asymmetric.ec.ECDSA(_hash))
if CRYPTOGRAPHY_HAS_EC_SIGN:
if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey):
signature = private_key.sign(_in, cryptography.hazmat.primitives.asymmetric.ec.ECDSA(_hash))

elif (isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey) or
isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey)):
out = private_key.sign(_in)
if CRYPTOGRAPHY_HAS_ED25519_SIGN:
if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey):
signature = private_key.sign(_in)

else:
self.module.fail_json(
msg="Unsupported key type"
)
if CRYPTOGRAPHY_HAS_ED448_SIGN:
if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey):
signature = private_key.sign(_in)

result['signature'] = base64.b64encode(out)
if CRYPTOGRAPHY_HAS_RSA_SIGN:
if isinstance(private_key, cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey):
signature = private_key.sign(_in, _padding, _hash)

if signature is None:
self.module.fail_json(
msg="Unsupported key type. Your cryptography version is {0}".format(CRYPTOGRAPHY_VERSION)
)

result['signature'] = base64.b64encode(signature)
return result

except Exception as e:
Expand All @@ -315,23 +257,19 @@ def run(self):
def main():
module = AnsibleModule(
argument_spec=dict(
action=dict(type='str', choices=['sign', 'verify'], required=True),
privatekey_path=dict(type='path'),
certificate=dict(type='path'),
privatekey_content=dict(type='str'),
privatekey_passphrase=dict(type='str', no_log=True),
path=dict(type='path', required=True),
signature=dict(type='str'),
select_crypto_backend=dict(type='str', choices=['auto', 'pyopenssl', 'cryptography'], default='auto'),
),
mutually_exclusive=(
['privatekey_path', 'privatekey_content'],
),
required_one_of=(
['privatekey_path', 'privatekey_content'],
),
supports_check_mode=True,
required_if=[
['action', 'sign', ['privatekey_path']],
['action', 'verify', ['certificate']],
['action', 'verify', ['signature']],
],
mutually_exclusive=[
['privatekey_path', 'certificate']
],
)

if not os.path.isfile(module.params['path']):
Expand Down Expand Up @@ -364,7 +302,7 @@ def main():
module.fail_json(msg=missing_required_lib('pyOpenSSL >= {0}'.format(MINIMAL_PYOPENSSL_VERSION)),
exception=PYOPENSSL_IMP_ERR)
module.deprecate('The module is using the PyOpenSSL backend. This backend has been deprecated',
version='2.13')
version='2.0.0', collection_name='community.crypto')
_sign = SignaturePyOpenSSL(module, backend)
elif backend == 'cryptography':
if not CRYPTOGRAPHY_FOUND:
Expand Down
Loading

0 comments on commit d2f60d2

Please sign in to comment.