diff --git a/src/validation_workflow/rpm/validation_rpm.py b/src/validation_workflow/rpm/validation_rpm.py index bf80a5615d..1d3373a031 100644 --- a/src/validation_workflow/rpm/validation_rpm.py +++ b/src/validation_workflow/rpm/validation_rpm.py @@ -27,6 +27,8 @@ def installation(self) -> bool: execute('sudo rpm --import https://artifacts.opensearch.org/publickeys/opensearch.pgp', str(self.tmp_dir.path), True, False) for project in self.args.projects: self.filename = os.path.basename(self.args.file_path.get(project)) + self.validate_metadata(project) + self.validate_signature() execute(f'sudo yum remove {project} -y', ".") execute(f'sudo env OPENSEARCH_INITIAL_ADMIN_PASSWORD={get_password(str(self.args.version))} rpm -ivh {os.path.join(self.tmp_dir.path, self.filename)}', str(self.tmp_dir.path), True, False) # noqa: 501 except: @@ -68,3 +70,63 @@ def cleanup(self) -> bool: except Exception as e: raise Exception(f'Exception occurred either while attempting to stop cluster or removing OpenSearch/OpenSearch-Dashboards. {str(e)}') return True + + def validate_metadata(self, product_type: str) -> None: + (_, stdout, _) = execute(f'rpm -qip {os.path.join(self.tmp_dir.path, self.filename)}', ".") + logging.info("Meta data for the RPM distribution is: \n" + stdout) + ref_map = {} + ref_map['Name'] = product_type + ref_map['Version'] = self.args.version + ref_map['Architecture'] = self.args.arch + ref_map['Group'] = "Application/Internet" + ref_map['License'] = "Apache-2.0" + ref_map['Relocations'] = "(not relocatable)" + ref_map['URL'] = "https://opensearch.org/" + # The context the meta data should be based on type OpenSearch or OpenSearchDashBoards + if product_type == "opensearch": + ref_map['Summary'] = "An open source distributed and RESTful search engine" + ref_map[ + 'Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/" + else: + ref_map['Summary'] = "Open source visualization dashboards for OpenSearch" + ref_map[ + 'Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/" + + meta_map = {} + for line in stdout.split('\n'): + key = line.split(':')[0].strip() + if key != 'Description': + meta_map[key] = line.split(':', 1)[1].strip() + else: + description_index = stdout.find(line) + meta_map[key] = stdout[description_index + len(line):].strip() + break + + for key, value in ref_map.items(): + if key == "Architecture": + if value == 'x64': + assert meta_map.get(key) == 'x86_64' + elif value == 'arm64': + assert meta_map.get(key) == 'aarch64' + else: + assert meta_map.get(key) == value + logging.info(f"Meta data for {key} -> {value} is validated") + logging.info(f"Validation for {product_type} meta data of RPM distribution completed.") + + def validate_signature(self) -> None: + (_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".") + logging.info(stdout) + key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest", + "Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"] + present_key = [] + for line in stdout.rstrip('\n').split('\n')[1:]: + key = line.split(':')[0].strip() + assert "OK" == line.split(':')[1].strip() + logging.info(f"{key} is validated as: {line}") + present_key.append(key) + logging.info("Validation of all key digests starts: ") + for digest in key_list: + assert digest in present_key + logging.info(f'Key digest "{digest}" is validated to be present.') + + logging.info("Validation for signature of RPM distribution completed.") diff --git a/tests/tests_validation_workflow/test_validation_rpm.py b/tests/tests_validation_workflow/test_validation_rpm.py index 8a65fb9546..f07e3ab779 100644 --- a/tests/tests_validation_workflow/test_validation_rpm.py +++ b/tests/tests_validation_workflow/test_validation_rpm.py @@ -93,7 +93,9 @@ def test_exceptions(self, mock_temporary_directory: Mock, mock_validation_args: @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') @patch('system.temporary_directory.TemporaryDirectory') @patch("validation_workflow.rpm.validation_rpm.execute") - def test_installation(self, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None: + @patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_metadata") + @patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_signature") + def test_installation(self, mock_validate_signature: Mock, mock_validate_metadata: Mock, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None: mock_validation_args.return_value.version = '2.3.0' mock_validation_args.return_value.arch = 'x64' mock_validation_args.return_value.platform = 'linux' @@ -207,3 +209,131 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc result = validate_rpm.cleanup() self.assertTrue(result) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock, + mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, + 'Name : opensearch\n' + 'Version : 1.3.0\n' + 'Architecture : x86_64\n' + 'Group : Application/Internet\n' + 'License : Apache-2.0\n' + 'Relocations : (not relocatable)\n' + 'URL : https://opensearch.org/\n' + 'Summary : An open source distributed and RESTful search engine\n' + 'Description:\nOpenSearch makes it easy to ingest, search, visualize, and analyze your data\n' + 'For more information, see: https://opensearch.org/', + None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + validate_rpm.args.version = '1.3.0' + validate_rpm.args.arch = "x64" + + validate_rpm.validate_metadata('opensearch') + + mock_logging_info.assert_any_call("Meta data for Name -> opensearch is validated") + mock_logging_info.assert_any_call("Meta data for Version -> 1.3.0 is validated") + mock_logging_info.assert_any_call( + "Meta data for Description -> OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/ is validated") + mock_logging_info.assert_any_call("Validation for opensearch meta data of RPM distribution completed.") + + mock_execute.assert_called_once_with( + 'rpm -qip /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_metadata_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, + mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nURL: https://opensearch.org/Summary: ' + 'An open source distributed and RESTful search engine\nDescription: This is a test application\n' + ' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + validate_rpm.args.version = '1.3.0' + validate_rpm.args.arch = "x64" + with self.assertRaises(AssertionError) as context: + validate_rpm.validate_metadata('opensearch') + self.assertIsInstance(context.exception, AssertionError) + + mock_execute.assert_called_once_with( + 'rpm -qip /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, '/tmp/trytytyuit/example.rpm\nHeader V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n' + 'MD5 digest: OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + validate_rpm.validate_signature() + + mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Validation of all key digests starts: ') + mock_logging_info.assert_any_call('Validation for signature of RPM distribution completed.') + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, '/tmp/trytytyuit/example.rpm\nHeader V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n' + 'MD5 digest: not OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + with self.assertRaises(AssertionError) as context: + validate_rpm.validate_signature() + self.assertIsInstance(context.exception, AssertionError) + + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + ) + + @patch('validation_workflow.rpm.validation_rpm.execute') + @patch('validation_workflow.rpm.validation_rpm.logging.info') + @patch('validation_workflow.rpm.validation_rpm.ValidationArgs') + @patch('system.temporary_directory.TemporaryDirectory') + def test_validate_signature_except(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None: + mock_execute.return_value = (None, + '/tmp/trytytyuit/example.rpm\n' + 'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA256 digest: OK\n' + 'Header SHA1 digest: OK\nPayload SHA256 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n', + None) + + validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value) + mock_temporary_directory.return_value.path = "/tmp/trytytyuit/" + validate_rpm.filename = 'example.rpm' + + with self.assertRaises(AssertionError) as context: + validate_rpm.validate_signature() + mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.') + mock_logging_info.assert_any_call('Validation of all key digests starts: ') + self.assertIsInstance(context.exception, AssertionError) + + mock_execute.assert_called_once_with( + 'rpm -K -v /tmp/trytytyuit/example.rpm', '.' + )