Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add meta and signature validation to RPM dist in Validation workflow #5006

Merged
merged 4 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions src/validation_workflow/rpm/validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ def installation(self) -> bool:
execute('sudo rpm --import https://artifacts.opensearch.org/publickeys/opensearch.pgp', str(self.tmp_dir.path), True, False)
for project in self.args.projects:
self.filename = os.path.basename(self.args.file_path.get(project))
self.validate_metadata(project)
self.validate_signature()
execute(f'sudo yum remove {project} -y', ".")
execute(f'sudo env OPENSEARCH_INITIAL_ADMIN_PASSWORD={get_password(str(self.args.version))} rpm -ivh {os.path.join(self.tmp_dir.path, self.filename)}', str(self.tmp_dir.path), True, False) # noqa: 501
except:
Expand Down Expand Up @@ -68,3 +70,63 @@ def cleanup(self) -> bool:
except Exception as e:
raise Exception(f'Exception occurred either while attempting to stop cluster or removing OpenSearch/OpenSearch-Dashboards. {str(e)}')
return True

def validate_metadata(self, product_type: str) -> None:
(_, stdout, _) = execute(f'rpm -qip {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info("Meta data for the RPM distribution is: \n" + stdout)
ref_map = {}
ref_map['Name'] = product_type
ref_map['Version'] = self.args.version
ref_map['Architecture'] = self.args.arch
ref_map['Group'] = "Application/Internet"
ref_map['License'] = "Apache-2.0"
ref_map['Relocations'] = "(not relocatable)"
ref_map['URL'] = "https://opensearch.org/"
# The context the meta data should be based on type OpenSearch or OpenSearchDashBoards
if product_type == "opensearch":
ref_map['Summary'] = "An open source distributed and RESTful search engine"
ref_map[
'Description'] = "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/"
else:
ref_map['Summary'] = "Open source visualization dashboards for OpenSearch"
ref_map[
'Description'] = "OpenSearch Dashboards is the visualization tool for data in OpenSearch\nFor more information, see: https://opensearch.org/"

meta_map = {}
for line in stdout.split('\n'):
key = line.split(':')[0].strip()
if key != 'Description':
meta_map[key] = line.split(':', 1)[1].strip()
else:
description_index = stdout.find(line)
meta_map[key] = stdout[description_index + len(line):].strip()
break

for key, value in ref_map.items():
if key == "Architecture":
if value == 'x64':
assert meta_map.get(key) == 'x86_64'
elif value == 'arm64':
assert meta_map.get(key) == 'aarch64'
else:
assert meta_map.get(key) == value
logging.info(f"Meta data for {key} -> {value} is validated")
logging.info(f"Validation for {product_type} meta data of RPM distribution completed.")

def validate_signature(self) -> None:
(_, stdout, _) = execute(f'rpm -K -v {os.path.join(self.tmp_dir.path, self.filename)}', ".")
logging.info(stdout)
key_list = ["Header V4 RSA/SHA512 Signature, key ID 9310d3fc", "Header SHA256 digest", "Header SHA1 digest",
"Payload SHA256 digest", "V4 RSA/SHA512 Signature, key ID 9310d3fc", "MD5 digest"]
present_key = []
for line in stdout.rstrip('\n').split('\n')[1:]:
key = line.split(':')[0].strip()
assert "OK" == line.split(':')[1].strip()
logging.info(f"{key} is validated as: {line}")
present_key.append(key)
logging.info("Validation of all key digests starts: ")
for digest in key_list:
assert digest in present_key
logging.info(f'Key digest "{digest}" is validated to be present.')

logging.info("Validation for signature of RPM distribution completed.")
132 changes: 131 additions & 1 deletion tests/tests_validation_workflow/test_validation_rpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ def test_exceptions(self, mock_temporary_directory: Mock, mock_validation_args:
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
@patch("validation_workflow.rpm.validation_rpm.execute")
def test_installation(self, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None:
@patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_metadata")
@patch("validation_workflow.rpm.validation_rpm.ValidateRpm.validate_signature")
def test_installation(self, mock_validate_signature: Mock, mock_validate_metadata: Mock, mock_temporary_directory: Mock, mock_system: Mock, mock_validation_args: Mock) -> None:
mock_validation_args.return_value.version = '2.3.0'
mock_validation_args.return_value.arch = 'x64'
mock_validation_args.return_value.platform = 'linux'
Expand Down Expand Up @@ -207,3 +209,131 @@ def test_cleanup(self, mock_temporary_directory: Mock, mock_validation_args: Moc

result = validate_rpm.cleanup()
self.assertTrue(result)

peterzhuamazon marked this conversation as resolved.
Show resolved Hide resolved
@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_metadata(self, mock_temporary_directory: Mock, mock_validation_args: Mock,
mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None,
'Name : opensearch\n'
'Version : 1.3.0\n'
'Architecture : x86_64\n'
'Group : Application/Internet\n'
'License : Apache-2.0\n'
'Relocations : (not relocatable)\n'
'URL : https://opensearch.org/\n'
'Summary : An open source distributed and RESTful search engine\n'
'Description:\nOpenSearch makes it easy to ingest, search, visualize, and analyze your data\n'
'For more information, see: https://opensearch.org/',
None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'
validate_rpm.args.version = '1.3.0'
validate_rpm.args.arch = "x64"

validate_rpm.validate_metadata('opensearch')

mock_logging_info.assert_any_call("Meta data for Name -> opensearch is validated")
mock_logging_info.assert_any_call("Meta data for Version -> 1.3.0 is validated")
mock_logging_info.assert_any_call(
"Meta data for Description -> OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/ is validated")
mock_logging_info.assert_any_call("Validation for opensearch meta data of RPM distribution completed.")

mock_execute.assert_called_once_with(
'rpm -qip /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_metadata_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock,
mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, 'Name: opensearch\nVersion: 1.2.3\nArchitecture: x86_64\nURL: https://opensearch.org/Summary: '
'An open source distributed and RESTful search engine\nDescription: This is a test application\n'
' "OpenSearch makes it easy to ingest, search, visualize, and analyze your data\nFor more information, see: https://opensearch.org/', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'
validate_rpm.args.version = '1.3.0'
validate_rpm.args.arch = "x64"
with self.assertRaises(AssertionError) as context:
validate_rpm.validate_metadata('opensearch')
self.assertIsInstance(context.exception, AssertionError)

mock_execute.assert_called_once_with(
'rpm -qip /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, '/tmp/trytytyuit/example.rpm\nHeader V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n'
'MD5 digest: OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

validate_rpm.validate_signature()

mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Validation of all key digests starts: ')
mock_logging_info.assert_any_call('Validation for signature of RPM distribution completed.')
mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature_exception(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None, '/tmp/trytytyuit/example.rpm\nHeader V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA1 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n'
'MD5 digest: not OK\nHeader SHA256 digest: OK\nPayload SHA256 digest: OK\n', None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

with self.assertRaises(AssertionError) as context:
validate_rpm.validate_signature()
self.assertIsInstance(context.exception, AssertionError)

mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)

@patch('validation_workflow.rpm.validation_rpm.execute')
@patch('validation_workflow.rpm.validation_rpm.logging.info')
@patch('validation_workflow.rpm.validation_rpm.ValidationArgs')
@patch('system.temporary_directory.TemporaryDirectory')
def test_validate_signature_except(self, mock_temporary_directory: Mock, mock_validation_args: Mock, mock_logging_info: Mock, mock_execute: Mock) -> None:
mock_execute.return_value = (None,
'/tmp/trytytyuit/example.rpm\n'
'Header V4 RSA/SHA512 Signature, key ID 9310d3fc: OK\nHeader SHA256 digest: OK\n'
'Header SHA1 digest: OK\nPayload SHA256 digest: OK\nV4 RSA/SHA512 Signature, key ID 9310d3fc: OK\n',
None)

validate_rpm = ValidateRpm(mock_validation_args.return_value, mock_temporary_directory.return_value)
mock_temporary_directory.return_value.path = "/tmp/trytytyuit/"
validate_rpm.filename = 'example.rpm'

with self.assertRaises(AssertionError) as context:
validate_rpm.validate_signature()
mock_logging_info.assert_any_call('Key digest "Header SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Key digest "Payload SHA256 digest" is validated to be present.')
mock_logging_info.assert_any_call('Validation of all key digests starts: ')
self.assertIsInstance(context.exception, AssertionError)

mock_execute.assert_called_once_with(
'rpm -K -v /tmp/trytytyuit/example.rpm', '.'
)
Loading