Skip to content

Commit

Permalink
First prototype of from_jwk
Browse files Browse the repository at this point in the history
  • Loading branch information
mccoyp committed Feb 25, 2021
1 parent 1bfb95a commit 13dd7a2
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(self, key, credential, **kwargs):

self._local_provider = NoLocalCryptography()
self._initialized = False
self._local_only = kwargs.pop("local_only", False)

super(CryptographyClient, self).__init__(vault_url=self._key_id.vault_url, credential=credential, **kwargs)

Expand All @@ -126,14 +127,24 @@ def key_id(self):
"""
return self._key_id.source_id

@classmethod
def from_jkw(cls, key_id, jwk):
# type: (str, dict) -> CryptographyClient
"""Creates a client that can only perform cryptographic operations locally.
:param str key_id: the full identifier of an Azure Key Vault key with a version.
:param dict jwk: the key's cryptographic material, as a dictionary.
"""
return cls(KeyVaultKey(key_id, jwk), object, local_only=True)

@distributed_trace
def _initialize(self, **kwargs):
# type: (**Any) -> None
if self._initialized:
return

# try to get the key material, if we don't have it and aren't forbidden to do so
if not (self._key or self._keys_get_forbidden):
if not (self._key or self._keys_get_forbidden or self._local_only):
try:
key_bundle = self._client.get_key(
self._key_id.vault_url, self._key_id.name, self._key_id.version, **kwargs
Expand Down Expand Up @@ -180,11 +191,13 @@ def encrypt(self, algorithm, plaintext, **kwargs):
_validate_arguments(operation=KeyOperation.encrypt, algorithm=algorithm, iv=iv, aad=aad)
self._initialize(**kwargs)

if self._local_provider.supports(KeyOperation.encrypt, algorithm):
if self._local_provider.supports(KeyOperation.encrypt, algorithm) or self._local_only:
raise_if_time_invalid(self._key)
try:
return self._local_provider.encrypt(algorithm, plaintext)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local encrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = self._client.encrypt(
Expand Down Expand Up @@ -235,10 +248,12 @@ def decrypt(self, algorithm, ciphertext, **kwargs):
_validate_arguments(operation=KeyOperation.decrypt, algorithm=algorithm, iv=iv, tag=tag, aad=aad)
self._initialize(**kwargs)

if self._local_provider.supports(KeyOperation.decrypt, algorithm):
if self._local_provider.supports(KeyOperation.decrypt, algorithm) or self._local_only:
try:
return self._local_provider.decrypt(algorithm, ciphertext)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local decrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = self._client.decrypt(
Expand Down Expand Up @@ -271,11 +286,13 @@ def wrap_key(self, algorithm, key, **kwargs):
:dedent: 8
"""
self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.wrap_key, algorithm):
if self._local_provider.supports(KeyOperation.wrap_key, algorithm) or self._local_only:
raise_if_time_invalid(self._key)
try:
return self._local_provider.wrap_key(algorithm, key)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local wrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = self._client.wrap_key(
Expand Down Expand Up @@ -306,10 +323,12 @@ def unwrap_key(self, algorithm, encrypted_key, **kwargs):
:dedent: 8
"""
self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.unwrap_key, algorithm):
if self._local_provider.supports(KeyOperation.unwrap_key, algorithm) or self._local_only:
try:
return self._local_provider.unwrap_key(algorithm, encrypted_key)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local unwrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = self._client.unwrap_key(
Expand Down Expand Up @@ -339,11 +358,13 @@ def sign(self, algorithm, digest, **kwargs):
:dedent: 8
"""
self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.sign, algorithm):
if self._local_provider.supports(KeyOperation.sign, algorithm) or self._local_only:
raise_if_time_invalid(self._key)
try:
return self._local_provider.sign(algorithm, digest)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local sign operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = self._client.sign(
Expand Down Expand Up @@ -376,10 +397,12 @@ def verify(self, algorithm, digest, signature, **kwargs):
:dedent: 8
"""
self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.verify, algorithm):
if self._local_provider.supports(KeyOperation.verify, algorithm) or self._local_only:
try:
return self._local_provider.verify(algorithm, digest, signature)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local verify operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = self._client.verify(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self, key: "Union[KeyVaultKey, str]", credential: "AsyncTokenCreden

self._local_provider = NoLocalCryptography()
self._initialized = False
self._local_only = kwargs.pop("local_only", False)

super().__init__(vault_url=self._key_id.vault_url, credential=credential, **kwargs)

Expand All @@ -79,14 +80,24 @@ def key_id(self) -> str:
"""
return self._key_id.source_id

@classmethod
def from_jkw(cls, key_id, jwk):
# type: (str, dict) -> CryptographyClient
"""Creates a client that can only perform cryptographic operations locally.
:param str key_id: the full identifier of an Azure Key Vault key with a version.
:param dict jwk: the key's cryptographic material, as a dictionary.
"""
return cls(KeyVaultKey(key_id, jwk), object, local_only=True)

@distributed_trace_async
async def _initialize(self, **kwargs):
# type: (**Any) -> None
if self._initialized:
return

# try to get the key material, if we don't have it and aren't forbidden to do so
if not (self._key or self._keys_get_forbidden):
if not (self._key or self._keys_get_forbidden or self._local_only):
try:
key_bundle = await self._client.get_key(
self._key_id.vault_url, self._key_id.name, self._key_id.version, **kwargs
Expand Down Expand Up @@ -132,11 +143,13 @@ async def encrypt(self, algorithm: "EncryptionAlgorithm", plaintext: bytes, **kw
_validate_arguments(operation=KeyOperation.encrypt, algorithm=algorithm, iv=iv, aad=aad)
await self._initialize(**kwargs)

if self._local_provider.supports(KeyOperation.encrypt, algorithm):
if self._local_provider.supports(KeyOperation.encrypt, algorithm) or self._local_only:
raise_if_time_invalid(self._key)
try:
return self._local_provider.encrypt(algorithm, plaintext)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local encrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = await self._client.encrypt(
Expand Down Expand Up @@ -186,10 +199,12 @@ async def decrypt(self, algorithm: "EncryptionAlgorithm", ciphertext: bytes, **k
_validate_arguments(operation=KeyOperation.decrypt, algorithm=algorithm, iv=iv, tag=tag, aad=aad)
await self._initialize(**kwargs)

if self._local_provider.supports(KeyOperation.decrypt, algorithm):
if self._local_provider.supports(KeyOperation.decrypt, algorithm) or self._local_only:
try:
return self._local_provider.decrypt(algorithm, ciphertext)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local decrypt operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = await self._client.decrypt(
Expand Down Expand Up @@ -221,11 +236,13 @@ async def wrap_key(self, algorithm: "KeyWrapAlgorithm", key: bytes, **kwargs: "A
:dedent: 8
"""
await self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.wrap_key, algorithm):
if self._local_provider.supports(KeyOperation.wrap_key, algorithm) or self._local_only:
raise_if_time_invalid(self._key)
try:
return self._local_provider.wrap_key(algorithm, key)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local wrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = await self._client.wrap_key(
Expand Down Expand Up @@ -255,10 +272,12 @@ async def unwrap_key(self, algorithm: "KeyWrapAlgorithm", encrypted_key: bytes,
:dedent: 8
"""
await self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.unwrap_key, algorithm):
if self._local_provider.supports(KeyOperation.unwrap_key, algorithm) or self._local_only:
try:
return self._local_provider.unwrap_key(algorithm, encrypted_key)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local unwrap operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = await self._client.unwrap_key(
Expand Down Expand Up @@ -288,11 +307,13 @@ async def sign(self, algorithm: "SignatureAlgorithm", digest: bytes, **kwargs: "
:dedent: 8
"""
await self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.sign, algorithm):
if self._local_provider.supports(KeyOperation.sign, algorithm) or self._local_only:
raise_if_time_invalid(self._key)
try:
return self._local_provider.sign(algorithm, digest)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local sign operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = await self._client.sign(
Expand Down Expand Up @@ -326,10 +347,12 @@ async def verify(
:dedent: 8
"""
await self._initialize(**kwargs)
if self._local_provider.supports(KeyOperation.verify, algorithm):
if self._local_provider.supports(KeyOperation.verify, algorithm) or self._local_only:
try:
return self._local_provider.verify(algorithm, digest, signature)
except Exception as ex: # pylint:disable=broad-except
if self._local_only:
raise
_LOGGER.warning("Local verify operation failed: %s", ex, exc_info=_LOGGER.isEnabledFor(logging.DEBUG))

operation_result = await self._client.verify(
Expand Down

0 comments on commit 13dd7a2

Please sign in to comment.