Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Key Vault] Add local-only mode to CryptographyClient #16565

Merged
merged 18 commits into from
Mar 10, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ class KeyVaultKey(object):

def __init__(self, key_id, jwk=None, **kwargs):
# type: (str, Optional[dict], **Any) -> None
self._properties = kwargs.pop("properties", None) or KeyProperties(key_id, **kwargs)
if not kwargs.pop("_local_only", False):
mccoyp marked this conversation as resolved.
Show resolved Hide resolved
self._properties = kwargs.pop("properties", None) or KeyProperties(key_id, **kwargs)
if isinstance(jwk, dict):
if any(field in kwargs for field in JsonWebKey._FIELDS): # pylint:disable=protected-access
raise ValueError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from ._key_validity import raise_if_time_invalid
from ._providers import get_local_cryptography_provider, NoLocalCryptography
from .. import KeyOperation
from .._models import KeyVaultKey
from .._models import JsonWebKey, KeyVaultKey
from .._shared import KeyVaultClientBase, parse_key_vault_id

if TYPE_CHECKING:
Expand Down Expand Up @@ -98,6 +98,7 @@ class CryptographyClient(KeyVaultClientBase):

def __init__(self, key, credential, **kwargs):
# type: (Union[KeyVaultKey, str], TokenCredential, **Any) -> None
self._jwk = kwargs.pop("_jwk", False)

if isinstance(key, KeyVaultKey):
self._key = key
Expand All @@ -106,16 +107,20 @@ def __init__(self, key, credential, **kwargs):
self._key = None
self._key_id = parse_key_vault_id(key)
self._keys_get_forbidden = None # type: Optional[bool]
elif self._jwk:
self._key_id = key.get("kid", "")
mccoyp marked this conversation as resolved.
Show resolved Hide resolved
self._key = KeyVaultKey(None, jwk=key, _local_only=True)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything we need for local crypto is in the JWK. Do we really need to jam that into KeyVaultKey?

Copy link
Member Author

@mccoyp mccoyp Mar 5, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the local crypto providers and operations assume that they're working with a KeyVaultKey, so we'd have to do so unless we refactor them to accept a JsonWebKey (unless I'm mistaken)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's unreasonable by any means to make local providers accept JWKs, but it seems like a design question of whether we do that (and preserve the current definition of a KeyVaultKey) or make a smaller tweak to accept KeyVaultKeys intended for local-only use

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, that's what I'm getting at. Their expectation of KeyVaultKey appears to be driving additional complexity here but that expectation isn't a requirement. Actually they only need the JWK. If they took that up front, maybe it wouldn't be necessary to complicate KeyVaultKey?

else:
raise ValueError("'key' must be a KeyVaultKey instance or a key ID string including a version")

if not self._key_id.version:
if not (self._jwk or self._key_id.version):
raise ValueError("'key' must include a version")

self._local_provider = NoLocalCryptography()
self._initialized = False

super(CryptographyClient, self).__init__(vault_url=self._key_id.vault_url, credential=credential, **kwargs)
vault_url = "vault_url" if self._jwk else self._key_id.vault_url
mccoyp marked this conversation as resolved.
Show resolved Hide resolved
super(CryptographyClient, self).__init__(vault_url=vault_url, credential=credential, **kwargs)

@property
def key_id(self):
Expand All @@ -124,7 +129,31 @@ def key_id(self):

:rtype: str
"""
return self._key_id.source_id
if not self._jwk:
return self._key_id.source_id
return self._key_id

@classmethod
def from_jwk(cls, jwk):
# type: (Union[JsonWebKey, dict]) -> CryptographyClient
"""Creates a client that can only perform cryptographic operations locally.

:param jwk: the key's cryptographic material, as a JsonWebKey or dictionary.
:type jwk: JsonWebKey or dict
:rtype: CryptographyClient

.. literalinclude:: ../tests/test_examples_crypto.py
:start-after: [START from_jwk]
:end-before: [END from_jwk]
:caption: Create a CryptographyClient from a JsonWebKey
:language: python
:dedent: 8
"""
if isinstance(jwk, JsonWebKey):
key = vars(jwk)
else:
key = jwk
return cls(key, object(), _jwk=True)
mccoyp marked this conversation as resolved.
Show resolved Hide resolved

@distributed_trace
def _initialize(self, **kwargs):
Expand Down Expand Up @@ -186,6 +215,12 @@ def encrypt(self, algorithm, plaintext, **kwargs):
return self._local_provider.encrypt(algorithm, plaintext)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local encrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "encrypt" operation with algorithm "{}"'.format(algorithm)
)

operation_result = self._client.encrypt(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -240,6 +275,12 @@ def decrypt(self, algorithm, ciphertext, **kwargs):
return self._local_provider.decrypt(algorithm, ciphertext)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local decrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "decrypt" operation with algorithm "{}"'.format(algorithm)
)

operation_result = self._client.decrypt(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -277,6 +318,12 @@ def wrap_key(self, algorithm, key, **kwargs):
return self._local_provider.wrap_key(algorithm, key)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local wrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "wrapKey" operation with algorithm "{}"'.format(algorithm)
)

operation_result = self._client.wrap_key(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -311,6 +358,12 @@ def unwrap_key(self, algorithm, encrypted_key, **kwargs):
return self._local_provider.unwrap_key(algorithm, encrypted_key)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local unwrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "unwrapKey" operation with algorithm "{}"'.format(algorithm)
)

operation_result = self._client.unwrap_key(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -345,6 +398,12 @@ def sign(self, algorithm, digest, **kwargs):
return self._local_provider.sign(algorithm, digest)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local sign operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "sign" operation with algorithm "{}"'.format(algorithm)
)

operation_result = self._client.sign(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -381,6 +440,12 @@ def verify(self, algorithm, digest, signature, **kwargs):
return self._local_provider.verify(algorithm, digest, signature)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local verify operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "verify" operation with algorithm "{}"'.format(algorithm)
)

operation_result = self._client.verify(
vault_base_url=self._key_id.vault_url,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from .._key_validity import raise_if_time_invalid
from .._providers import get_local_cryptography_provider, NoLocalCryptography
from ... import KeyOperation
from ..._models import KeyVaultKey
from ..._models import JsonWebKey, KeyVaultKey
from ..._shared import AsyncKeyVaultClientBase, parse_key_vault_id

if TYPE_CHECKING:
Expand Down Expand Up @@ -53,31 +53,60 @@ class CryptographyClient(AsyncKeyVaultClientBase):
"""

def __init__(self, key: "Union[KeyVaultKey, str]", credential: "AsyncTokenCredential", **kwargs: "Any") -> None:
self._jwk = kwargs.pop("_jwk", False)

if isinstance(key, KeyVaultKey):
self._key = key
self._key_id = parse_key_vault_id(key.id)
elif isinstance(key, str):
self._key = None
self._key_id = parse_key_vault_id(key)
self._keys_get_forbidden = None # type: Optional[bool]
elif self._jwk:
self._key_id = key.get("kid", "")
self._key = KeyVaultKey(None, jwk=key, _local_only=True)
else:
raise ValueError("'key' must be a KeyVaultKey instance or a key ID string including a version")

if not self._key_id.version:
if not (self._jwk or self._key_id.version):
raise ValueError("'key' must include a version")

self._local_provider = NoLocalCryptography()
self._initialized = False

super().__init__(vault_url=self._key_id.vault_url, credential=credential, **kwargs)
vault_url = "vault_url" if self._jwk else self._key_id.vault_url
super().__init__(vault_url=vault_url, credential=credential, **kwargs)

@property
def key_id(self) -> str:
"""The full identifier of the client's key.

:rtype: str
"""
return self._key_id.source_id
if not self._jwk:
return self._key_id.source_id
return self._key_id

@classmethod
def from_jwk(cls, jwk: "Union[JsonWebKey, dict]") -> "CryptographyClient":
"""Creates a client that can only perform cryptographic operations locally.

:param jwk: the key's cryptographic material, as a JsonWebKey or dictionary.
:type jwk: JsonWebKey or dict
:rtype: CryptographyClient

.. literalinclude:: ../tests/test_examples_crypto.py
:start-after: [START from_jwk]
:end-before: [END from_jwk]
:caption: Create a CryptographyClient from a JsonWebKey
:language: python
:dedent: 8
"""
if isinstance(jwk, JsonWebKey):
key = vars(jwk)
else:
key = jwk
return cls(key, object(), _jwk=True)

@distributed_trace_async
async def _initialize(self, **kwargs):
Expand Down Expand Up @@ -138,6 +167,12 @@ async def encrypt(self, algorithm: "EncryptionAlgorithm", plaintext: bytes, **kw
return self._local_provider.encrypt(algorithm, plaintext)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local encrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "encrypt" operation with algorithm "{}"'.format(algorithm)
)

operation_result = await self._client.encrypt(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -191,6 +226,12 @@ async def decrypt(self, algorithm: "EncryptionAlgorithm", ciphertext: bytes, **k
return self._local_provider.decrypt(algorithm, ciphertext)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local decrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "decrypt" operation with algorithm "{}"'.format(algorithm)
)

operation_result = await self._client.decrypt(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -227,6 +268,12 @@ async def wrap_key(self, algorithm: "KeyWrapAlgorithm", key: bytes, **kwargs: "A
return self._local_provider.wrap_key(algorithm, key)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local wrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "wrapKey" operation with algorithm "{}"'.format(algorithm)
)

operation_result = await self._client.wrap_key(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -260,6 +307,12 @@ async def unwrap_key(self, algorithm: "KeyWrapAlgorithm", encrypted_key: bytes,
return self._local_provider.unwrap_key(algorithm, encrypted_key)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local unwrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "unwrapKey" operation with algorithm "{}"'.format(algorithm)
)

operation_result = await self._client.unwrap_key(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -294,6 +347,12 @@ async def sign(self, algorithm: "SignatureAlgorithm", digest: bytes, **kwargs: "
return self._local_provider.sign(algorithm, digest)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local sign operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "sign" operation with algorithm "{}"'.format(algorithm)
)

operation_result = await self._client.sign(
vault_base_url=self._key_id.vault_url,
Expand Down Expand Up @@ -331,6 +390,12 @@ async def verify(
return self._local_provider.verify(algorithm, digest, signature)
except Exception as ex: # pylint:disable=broad-except
_LOGGER.warning("Local verify operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))
if self._jwk:
raise
elif self._jwk:
raise NotImplementedError(
'This key does not support the "verify" operation with algorithm "{}"'.format(algorithm)
)

operation_result = await self._client.verify(
vault_base_url=self._key_id.vault_url,
Expand Down
Loading