From 05220e06a23b3d4c398628918dab113abcce52db Mon Sep 17 00:00:00 2001 From: aeitzman <12433791+aeitzman@users.noreply.github.com> Date: Mon, 10 Jun 2024 13:25:01 -0700 Subject: [PATCH] feat: adds X509 workload cert logic (#1527) * feat: adds X509 workload cert logic * add JSON checking, and edits comments * Adds comment with more explanation * fix test coverage --------- Co-authored-by: Leo <39062083+lsirac@users.noreply.github.com> --- google/auth/transport/_mtls_helper.py | 146 +++++++++++++++-- tests/transport/test__mtls_helper.py | 219 ++++++++++++++++++++++---- tests/transport/test_grpc.py | 36 ++--- 3 files changed, 339 insertions(+), 62 deletions(-) diff --git a/google/auth/transport/_mtls_helper.py b/google/auth/transport/_mtls_helper.py index 1b9b9c285..e95b953a1 100644 --- a/google/auth/transport/_mtls_helper.py +++ b/google/auth/transport/_mtls_helper.py @@ -16,13 +16,15 @@ import json import logging -from os import path +from os import environ, path import re import subprocess from google.auth import exceptions CONTEXT_AWARE_METADATA_PATH = "~/.secureConnect/context_aware_metadata.json" +_CERTIFICATE_CONFIGURATION_DEFAULT_PATH = "~/.config/gcloud/certificate_config.json" +_CERTIFICATE_CONFIGURATION_ENV = "GOOGLE_API_CERTIFICATE_CONFIG" _CERT_PROVIDER_COMMAND = "cert_provider_command" _CERT_REGEX = re.compile( b"-----BEGIN CERTIFICATE-----.+-----END CERTIFICATE-----\r?\n?", re.DOTALL @@ -63,26 +65,150 @@ def _check_dca_metadata_path(metadata_path): return metadata_path -def _read_dca_metadata_file(metadata_path): - """Loads context aware metadata from the given path. +def _load_json_file(path): + """Reads and loads JSON from the given path. Used to read both X509 workload certificate and + secure connect configurations. Args: - metadata_path (str): context aware metadata path. + path (str): the path to read from. Returns: - Dict[str, str]: The metadata. + Dict[str, str]: The JSON stored at the file. Raises: - google.auth.exceptions.ClientCertError: If failed to parse metadata as JSON. + google.auth.exceptions.ClientCertError: If failed to parse the file as JSON. """ try: - with open(metadata_path) as f: - metadata = json.load(f) + with open(path) as f: + json_data = json.load(f) except ValueError as caught_exc: new_exc = exceptions.ClientCertError(caught_exc) raise new_exc from caught_exc - return metadata + return json_data + + +def _get_workload_cert_and_key(certificate_config_path=None): + """Read the workload identity cert and key files specified in the certificate config provided. + If no config path is provided, check the environment variable: "GOOGLE_API_CERTIFICATE_CONFIG" + first, then the well known gcloud location: "~/.config/gcloud/certificate_config.json". + + Args: + certificate_config_path (string): The certificate config path. If no path is provided, + the environment variable will be checked first, then the well known gcloud location. + + Returns: + Tuple[Optional[bytes], Optional[bytes]]: client certificate bytes in PEM format and key + bytes in PEM format. + + Raises: + google.auth.exceptions.ClientCertError: if problems occurs when retrieving + the certificate or key information. + """ + absolute_path = _get_cert_config_path(certificate_config_path) + if absolute_path is None: + return None, None + data = _load_json_file(absolute_path) + + if "cert_configs" not in data: + raise exceptions.ClientCertError( + 'Certificate config file {} is in an invalid format, a "cert configs" object is expected'.format( + absolute_path + ) + ) + cert_configs = data["cert_configs"] + + if "workload" not in cert_configs: + raise exceptions.ClientCertError( + 'Certificate config file {} is in an invalid format, a "workload" cert config is expected'.format( + absolute_path + ) + ) + workload = cert_configs["workload"] + + if "cert_path" not in workload: + raise exceptions.ClientCertError( + 'Certificate config file {} is in an invalid format, a "cert_path" is expected in the workload cert config'.format( + absolute_path + ) + ) + cert_path = workload["cert_path"] + + if "key_path" not in workload: + raise exceptions.ClientCertError( + 'Certificate config file {} is in an invalid format, a "key_path" is expected in the workload cert config'.format( + absolute_path + ) + ) + key_path = workload["key_path"] + + return _read_cert_and_key_files(cert_path, key_path) + + +def _get_cert_config_path(certificate_config_path=None): + """Gets the certificate configuration full path using the following order of precedence: + + 1: Explicit override, if set + 2: Environment variable, if set + 3: Well-known location + + Returns "None" if the selected config file does not exist. + + Args: + certificate_config_path (string): The certificate config path. If provided, the well known + location and environment variable will be ignored. + + Returns: + The absolute path of the certificate config file, and None if the file does not exist. + """ + + if certificate_config_path is None: + env_path = environ.get(_CERTIFICATE_CONFIGURATION_ENV, None) + if env_path is not None and env_path != "": + certificate_config_path = env_path + else: + certificate_config_path = _CERTIFICATE_CONFIGURATION_DEFAULT_PATH + + certificate_config_path = path.expanduser(certificate_config_path) + if not path.exists(certificate_config_path): + return None + return certificate_config_path + + +def _read_cert_and_key_files(cert_path, key_path): + cert_data = _read_cert_file(cert_path) + key_data = _read_key_file(key_path) + + return cert_data, key_data + + +def _read_cert_file(cert_path): + with open(cert_path, "rb") as cert_file: + cert_data = cert_file.read() + + cert_match = re.findall(_CERT_REGEX, cert_data) + if len(cert_match) != 1: + raise exceptions.ClientCertError( + "Certificate file {} is in an invalid format, a single PEM formatted certificate is expected".format( + cert_path + ) + ) + return cert_match[0] + + +def _read_key_file(key_path): + with open(key_path, "rb") as key_file: + key_data = key_file.read() + + key_match = re.findall(_KEY_REGEX, key_data) + if len(key_match) != 1: + raise exceptions.ClientCertError( + "Private key file {} is in an invalid format, a single PEM formatted private key is expected".format( + key_path + ) + ) + + return key_match[0] def _run_cert_provider_command(command, expect_encrypted_key=False): @@ -163,7 +289,7 @@ def get_client_ssl_credentials( metadata_path = _check_dca_metadata_path(context_aware_metadata_path) if metadata_path: - metadata_json = _read_dca_metadata_file(metadata_path) + metadata_json = _load_json_file(metadata_path) if _CERT_PROVIDER_COMMAND not in metadata_json: raise exceptions.ClientCertError("Cert provider command is not found") diff --git a/tests/transport/test__mtls_helper.py b/tests/transport/test__mtls_helper.py index 1621a0530..b195616dd 100644 --- a/tests/transport/test__mtls_helper.py +++ b/tests/transport/test__mtls_helper.py @@ -126,7 +126,7 @@ def test_failure(self): class TestReadMetadataFile(object): def test_success(self): metadata_path = os.path.join(pytest.data_dir, "context_aware_metadata.json") - metadata = _mtls_helper._read_dca_metadata_file(metadata_path) + metadata = _mtls_helper._load_json_file(metadata_path) assert "cert_provider_command" in metadata @@ -134,7 +134,7 @@ def test_file_not_json(self): # read a file which is not json format. metadata_path = os.path.join(pytest.data_dir, "privatekey.pem") with pytest.raises(exceptions.ClientCertError): - _mtls_helper._read_dca_metadata_file(metadata_path) + _mtls_helper._load_json_file(metadata_path) class TestRunCertProviderCommand(object): @@ -277,22 +277,18 @@ class TestGetClientSslCredentials(object): @mock.patch( "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True ) - @mock.patch( - "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True - ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_success( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_run_cert_provider_command, ): mock_check_dca_metadata_path.return_value = True - mock_read_dca_metadata_file.return_value = { - "cert_provider_command": ["command"] - } + mock_load_json_file.return_value = {"cert_provider_command": ["command"]} mock_run_cert_provider_command.return_value = (b"cert", b"key", None) has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials() assert has_cert @@ -314,22 +310,18 @@ def test_success_without_metadata(self, mock_check_dca_metadata_path): @mock.patch( "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True ) - @mock.patch( - "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True - ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_success_with_encrypted_key( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_run_cert_provider_command, ): mock_check_dca_metadata_path.return_value = True - mock_read_dca_metadata_file.return_value = { - "cert_provider_command": ["command"] - } + mock_load_json_file.return_value = {"cert_provider_command": ["command"]} mock_run_cert_provider_command.return_value = (b"cert", b"key", b"passphrase") has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials( generate_encrypted_key=True @@ -342,40 +334,34 @@ def test_success_with_encrypted_key( ["command", "--with_passphrase"], expect_encrypted_key=True ) - @mock.patch( - "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True - ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_missing_cert_command( - self, mock_check_dca_metadata_path, mock_read_dca_metadata_file + self, mock_check_dca_metadata_path, mock_load_json_file ): mock_check_dca_metadata_path.return_value = True - mock_read_dca_metadata_file.return_value = {} + mock_load_json_file.return_value = {} with pytest.raises(exceptions.ClientCertError): _mtls_helper.get_client_ssl_credentials() @mock.patch( "google.auth.transport._mtls_helper._run_cert_provider_command", autospec=True ) - @mock.patch( - "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True - ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_customize_context_aware_metadata_path( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_run_cert_provider_command, ): context_aware_metadata_path = "/path/to/metata/data" mock_check_dca_metadata_path.return_value = context_aware_metadata_path - mock_read_dca_metadata_file.return_value = { - "cert_provider_command": ["command"] - } + mock_load_json_file.return_value = {"cert_provider_command": ["command"]} mock_run_cert_provider_command.return_value = (b"cert", b"key", None) has_cert, cert, key, passphrase = _mtls_helper.get_client_ssl_credentials( @@ -387,7 +373,182 @@ def test_customize_context_aware_metadata_path( assert key == b"key" assert passphrase is None mock_check_dca_metadata_path.assert_called_with(context_aware_metadata_path) - mock_read_dca_metadata_file.assert_called_with(context_aware_metadata_path) + mock_load_json_file.assert_called_with(context_aware_metadata_path) + + +class TestGetWorkloadCertAndKey(object): + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + @mock.patch( + "google.auth.transport._mtls_helper._read_cert_and_key_files", autospec=True + ) + def test_success( + self, + mock_read_cert_and_key_files, + mock_get_cert_config_path, + mock_load_json_file, + ): + cert_config_path = "/path/to/cert" + mock_get_cert_config_path.return_value = "/path/to/cert" + mock_load_json_file.return_value = { + "cert_configs": { + "workload": {"cert_path": "cert/path", "key_path": "key/path"} + } + } + mock_read_cert_and_key_files.return_value = ( + pytest.public_cert_bytes, + pytest.private_key_bytes, + ) + + actual_cert, actual_key = _mtls_helper._get_workload_cert_and_key( + cert_config_path + ) + assert actual_cert == pytest.public_cert_bytes + assert actual_key == pytest.private_key_bytes + + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + def test_file_not_found_returns_none(self, mock_get_cert_config_path): + mock_get_cert_config_path.return_value = None + + actual_cert, actual_key = _mtls_helper._get_workload_cert_and_key() + assert actual_cert is None + assert actual_key is None + + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + def test_no_cert_configs(self, mock_get_cert_config_path, mock_load_json_file): + mock_get_cert_config_path.return_value = "/path/to/cert" + mock_load_json_file.return_value = {} + + with pytest.raises(exceptions.ClientCertError): + _mtls_helper._get_workload_cert_and_key("") + + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + def test_no_workload(self, mock_get_cert_config_path, mock_load_json_file): + mock_get_cert_config_path.return_value = "/path/to/cert" + mock_load_json_file.return_value = {"cert_configs": {}} + + with pytest.raises(exceptions.ClientCertError): + _mtls_helper._get_workload_cert_and_key("") + + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + def test_no_cert_file(self, mock_get_cert_config_path, mock_load_json_file): + mock_get_cert_config_path.return_value = "/path/to/cert" + mock_load_json_file.return_value = { + "cert_configs": {"workload": {"key_path": "path/to/key"}} + } + + with pytest.raises(exceptions.ClientCertError): + _mtls_helper._get_workload_cert_and_key("") + + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) + @mock.patch( + "google.auth.transport._mtls_helper._get_cert_config_path", autospec=True + ) + def test_no_key_file(self, mock_get_cert_config_path, mock_load_json_file): + mock_get_cert_config_path.return_value = "/path/to/cert" + mock_load_json_file.return_value = { + "cert_configs": {"workload": {"cert_path": "path/to/key"}} + } + + with pytest.raises(exceptions.ClientCertError): + _mtls_helper._get_workload_cert_and_key("") + + +class TestReadCertAndKeyFile(object): + def test_success(self): + cert_path = os.path.join(pytest.data_dir, "public_cert.pem") + key_path = os.path.join(pytest.data_dir, "privatekey.pem") + + actual_cert, actual_key = _mtls_helper._read_cert_and_key_files( + cert_path, key_path + ) + assert actual_cert == pytest.public_cert_bytes + assert actual_key == pytest.private_key_bytes + + def test_no_cert_file(self): + cert_path = "fake/file/path" + key_path = os.path.join(pytest.data_dir, "privatekey.pem") + with pytest.raises(FileNotFoundError): + _mtls_helper._read_cert_and_key_files(cert_path, key_path) + + def test_no_key_file(self): + cert_path = os.path.join(pytest.data_dir, "public_cert.pem") + key_path = "fake/file/path" + with pytest.raises(FileNotFoundError): + _mtls_helper._read_cert_and_key_files(cert_path, key_path) + + def test_invalid_cert_file(self): + cert_path = os.path.join(pytest.data_dir, "service_account.json") + key_path = os.path.join(pytest.data_dir, "privatekey.pem") + with pytest.raises(exceptions.ClientCertError): + _mtls_helper._read_cert_and_key_files(cert_path, key_path) + + def test_invalid_key_file(self): + cert_path = os.path.join(pytest.data_dir, "public_cert.pem") + key_path = os.path.join(pytest.data_dir, "public_cert.pem") + with pytest.raises(exceptions.ClientCertError): + _mtls_helper._read_cert_and_key_files(cert_path, key_path) + + +class TestGetCertConfigPath(object): + def test_success_with_override(self): + config_path = os.path.join(pytest.data_dir, "service_account.json") + returned_path = _mtls_helper._get_cert_config_path(config_path) + assert returned_path == config_path + + def test_override_does_not_exist(self): + config_path = "fake/file/path" + returned_path = _mtls_helper._get_cert_config_path(config_path) + assert returned_path is None + + @mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""}) + @mock.patch("os.path.exists", autospec=True) + def test_default(self, mock_path_exists): + mock_path_exists.return_value = True + returned_path = _mtls_helper._get_cert_config_path() + expected_path = os.path.expanduser( + _mtls_helper._CERTIFICATE_CONFIGURATION_DEFAULT_PATH + ) + assert returned_path == expected_path + + @mock.patch.dict( + os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"} + ) + @mock.patch("os.path.exists", autospec=True) + def test_env_variable(self, mock_path_exists): + mock_path_exists.return_value = True + returned_path = _mtls_helper._get_cert_config_path() + expected_path = "path/to/config/file" + assert returned_path == expected_path + + @mock.patch.dict(os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": ""}) + @mock.patch("os.path.exists", autospec=True) + def test_env_variable_file_does_not_exist(self, mock_path_exists): + mock_path_exists.return_value = False + returned_path = _mtls_helper._get_cert_config_path() + assert returned_path is None + + @mock.patch.dict( + os.environ, {"GOOGLE_API_CERTIFICATE_CONFIG": "path/to/config/file"} + ) + @mock.patch("os.path.exists", autospec=True) + def test_default_file_does_not_exist(self, mock_path_exists): + mock_path_exists.return_value = False + returned_path = _mtls_helper._get_cert_config_path() + assert returned_path is None class TestGetClientCertAndKey(object): diff --git a/tests/transport/test_grpc.py b/tests/transport/test_grpc.py index f62ab0eae..433cc6855 100644 --- a/tests/transport/test_grpc.py +++ b/tests/transport/test_grpc.py @@ -141,16 +141,14 @@ def test__get_authorization_headers_with_service_account_and_default_host(self): @mock.patch("grpc.ssl_channel_credentials", autospec=True) @mock.patch("grpc.secure_channel", autospec=True) class TestSecureAuthorizedChannel(object): - @mock.patch( - "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True - ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_secure_authorized_channel_adc( self, check_dca_metadata_path, - read_dca_metadata_file, + load_json_file, secure_channel, ssl_channel_credentials, metadata_call_credentials, @@ -164,9 +162,7 @@ def test_secure_authorized_channel_adc( # Mock the context aware metadata and client cert/key so mTLS SSL channel # will be used. check_dca_metadata_path.return_value = METADATA_PATH - read_dca_metadata_file.return_value = { - "cert_provider_command": ["some command"] - } + load_json_file.return_value = {"cert_provider_command": ["some command"]} get_client_ssl_credentials.return_value = ( True, PUBLIC_CERT_BYTES, @@ -334,16 +330,14 @@ def test_secure_authorized_channel_with_client_cert_callback_success( ssl_channel_credentials.return_value, metadata_call_credentials.return_value ) - @mock.patch( - "google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True - ) + @mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) def test_secure_authorized_channel_with_client_cert_callback_failure( self, check_dca_metadata_path, - read_dca_metadata_file, + load_json_file, secure_channel, ssl_channel_credentials, metadata_call_credentials, @@ -405,7 +399,7 @@ def test_secure_authorized_channel_cert_callback_without_client_cert_env( @mock.patch( "google.auth.transport._mtls_helper.get_client_ssl_credentials", autospec=True ) -@mock.patch("google.auth.transport._mtls_helper._read_dca_metadata_file", autospec=True) +@mock.patch("google.auth.transport._mtls_helper._load_json_file", autospec=True) @mock.patch( "google.auth.transport._mtls_helper._check_dca_metadata_path", autospec=True ) @@ -413,7 +407,7 @@ class TestSslCredentials(object): def test_no_context_aware_metadata( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): @@ -436,14 +430,12 @@ def test_no_context_aware_metadata( def test_get_client_ssl_credentials_failure( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): mock_check_dca_metadata_path.return_value = METADATA_PATH - mock_read_dca_metadata_file.return_value = { - "cert_provider_command": ["some command"] - } + mock_load_json_file.return_value = {"cert_provider_command": ["some command"]} # Mock that client cert and key are not loaded and exception is raised. mock_get_client_ssl_credentials.side_effect = exceptions.ClientCertError() @@ -457,14 +449,12 @@ def test_get_client_ssl_credentials_failure( def test_get_client_ssl_credentials_success( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): mock_check_dca_metadata_path.return_value = METADATA_PATH - mock_read_dca_metadata_file.return_value = { - "cert_provider_command": ["some command"] - } + mock_load_json_file.return_value = {"cert_provider_command": ["some command"]} mock_get_client_ssl_credentials.return_value = ( True, PUBLIC_CERT_BYTES, @@ -487,7 +477,7 @@ def test_get_client_ssl_credentials_success( def test_get_client_ssl_credentials_without_client_cert_env( self, mock_check_dca_metadata_path, - mock_read_dca_metadata_file, + mock_load_json_file, mock_get_client_ssl_credentials, mock_ssl_channel_credentials, ): @@ -497,6 +487,6 @@ def test_get_client_ssl_credentials_without_client_cert_env( assert ssl_credentials.ssl_credentials is not None assert not ssl_credentials.is_mtls mock_check_dca_metadata_path.assert_not_called() - mock_read_dca_metadata_file.assert_not_called() + mock_load_json_file.assert_not_called() mock_get_client_ssl_credentials.assert_not_called() mock_ssl_channel_credentials.assert_called_once()