diff --git a/tests/unit/tuf/test_repository.py b/tests/unit/tuf/test_repository.py index 3b369ae92a3d..edbe0c1c556b 100644 --- a/tests/unit/tuf/test_repository.py +++ b/tests/unit/tuf/test_repository.py @@ -248,8 +248,8 @@ def test__create_delegated_targets_roles_raises_storageerror( ] assert tuf_repository._store.calls[0].args[0] == "test_bin" - def test__create_delegated_targets_roles_raise_fileexists( - self, tuf_repository, monkeypatch + def test__create_delegated_targets_roles_missing_delegated_role( + self, tuf_repository ): fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) fake_targets_md = pretend.stub( @@ -259,6 +259,33 @@ def test__create_delegated_targets_roles_raise_fileexists( ) fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) + test_delegate_roles_parameters = [ + repository.RolesPayload( + expiration=fake_time, + threshold=1, + keys=[{"keyid": "key1"}, {"keyid": "key2"}], + paths=["*/*"], + ) + ] + + with pytest.raises(ValueError) as err: + tuf_repository._create_delegated_targets_roles( + delegator_metadata=fake_targets_md, + delegate_role_parameters=test_delegate_roles_parameters, + snapshot_metadata=fake_snapshot_md, + ) + + assert "A delegation role name is required." in str(err.value) + + def test__create_delegated_targets_roles_raise_fileexists(self, tuf_repository): + fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1) + fake_targets_md = pretend.stub( + signed=pretend.stub( + delegations=None, add_key=pretend.call_recorder(lambda *a, **kw: None) + ) + ) + fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={})) + tuf_repository.load_role = pretend.call_recorder( lambda role: fake_snapshot_md if role == Snapshot.type else True ) diff --git a/tests/unit/tuf/test_services.py b/tests/unit/tuf/test_services.py index 7e75f2d2542e..01b4d24c12bd 100644 --- a/tests/unit/tuf/test_services.py +++ b/tests/unit/tuf/test_services.py @@ -54,6 +54,7 @@ def test_get(self, db_request, monkeypatch): "keyid_hash_algorithms": ["sha256", "sha512"], } db_request.registry.settings["tuf.root.secret"] = "tuf.root.secret" + monkeypatch.setattr(glob, "glob", lambda privkey_path: ["fake_root.key"]) monkeypatch.setattr( "warehouse.tuf.services.import_ed25519_privatekey_from_file", lambda *a, **kw: expected_priv_key_dict, @@ -84,8 +85,6 @@ def test_get(self, monkeypatch): monkeypatch.setattr(glob, "glob", lambda *a, **kw: ["1.root.json"]) fake_file_object = pretend.stub( - __enter__=None, - __exit__=None, close=pretend.call_recorder(lambda: None), read=pretend.call_recorder(lambda: b"fake_root_data"), ) @@ -96,7 +95,7 @@ def test_get(self, monkeypatch): with service.get("root") as r: result = r.read() - assert result == b"fake_root_data" + assert result == fake_file_object.read() assert fake_file_object.close.calls == [pretend.call()] def test_get_max_version_raises_valueerror(self, monkeypatch): @@ -105,8 +104,6 @@ def test_get_max_version_raises_valueerror(self, monkeypatch): monkeypatch.setattr(glob, "glob", lambda *a, **kw: []) fake_file_object = pretend.stub( - __enter__=None, - __exit__=None, close=pretend.call_recorder(lambda: None), read=pretend.call_recorder(lambda: b"fake_root_data"), ) @@ -117,7 +114,7 @@ def test_get_max_version_raises_valueerror(self, monkeypatch): with service.get("root") as r: result = r.read() - assert result == b"fake_root_data" + assert result == fake_file_object.read() assert fake_file_object.close.calls == [pretend.call()] def test_get_oserror(self, monkeypatch): @@ -144,8 +141,6 @@ def test_get_specific_version(self, monkeypatch): ) fake_file_object = pretend.stub( - __enter__=None, - __exit__=None, close=pretend.call_recorder(lambda: None), read=pretend.call_recorder(lambda: b"fake_data"), ) @@ -156,7 +151,7 @@ def test_get_specific_version(self, monkeypatch): with service.get("root", version=2) as r: result = r.read() - assert result == b"fake_data" + assert result == fake_file_object.read() assert fake_file_object.close.calls == [pretend.call()] def test_get_timestamp_specific(self, monkeypatch): @@ -165,8 +160,6 @@ def test_get_timestamp_specific(self, monkeypatch): monkeypatch.setattr(glob, "glob", lambda *a, **kw: ["timestamp.json"]) fake_file_object = pretend.stub( - __enter__=None, - __exit__=None, close=pretend.call_recorder(lambda: None), read=pretend.call_recorder(lambda: b"fake_data"), ) @@ -177,7 +170,7 @@ def test_get_timestamp_specific(self, monkeypatch): with service.get("timestamp") as r: result = r.read() - assert result == b"fake_data" + assert result == fake_file_object.read() def test_put(self, monkeypatch): service = services.LocalStorageService("/opt/warehouse/src/dev/metadata") @@ -312,6 +305,10 @@ def test_create_service(self): assert service._storage_backend == fake_service assert service._key_storage_backend == fake_service assert service._request == request + assert request.find_service.calls == [ + pretend.call(IStorageService), + pretend.call(IKeyService), + ] def test_basic_init(self): service = services.RepositoryService( @@ -451,8 +448,15 @@ def test_init_repository(self, db_request, monkeypatch): service = services.RepositoryService(fake_storage, fake_key_storage, db_request) result = service.init_repository() - call_args = fake_metadata_repository.initialize.calls[0].args[0] assert result is None + # one call for role (4) + assert fake_datetime.now.calls == [ + pretend.call(), + pretend.call(), + pretend.call(), + pretend.call(), + ] + call_args = fake_metadata_repository.initialize.calls[0].args[0] assert str(call_args["snapshot"].expiration) == "2019-06-17 09:05:01" assert str(call_args["timestamp"].expiration) == "2019-06-17 09:05:01" assert str(call_args["root"].expiration) == "2020-06-15 09:05:01" @@ -549,9 +553,9 @@ def test_init_targets_delegation(self, db_request, monkeypatch): service = services.RepositoryService(fake_storage, fake_key_storage, db_request) service.bump_snapshot = pretend.call_recorder(lambda snapshot_metadata: None) result = service.init_targets_delegation() - call_args = fake_metadata_repository.delegate_targets_roles.calls[0].args[0] assert result is None + call_args = fake_metadata_repository.delegate_targets_roles.calls[0].args[0] assert sorted(["targets", "bins"]) == sorted(list(call_args.keys())) assert len(call_args["targets"]) == 1 assert call_args["targets"][0].paths == ["*/*", "*/*/*/*"] diff --git a/tests/unit/tuf/test_tasks.py b/tests/unit/tuf/test_tasks.py index c68484d2f63a..eacd454419eb 100644 --- a/tests/unit/tuf/test_tasks.py +++ b/tests/unit/tuf/test_tasks.py @@ -48,6 +48,7 @@ def __exit__(self, type, value, traceback): tasks.bump_snapshot(task, db_request) assert db_request.find_service.calls == [pretend.call(IRepositoryService)] + assert fake_irepository.bump_snapshot.calls == [pretend.call()] class TestBumpBinNRoles: @@ -81,6 +82,7 @@ def __exit__(self, type, value, traceback): tasks.bump_bin_n_roles(task, db_request) assert db_request.find_service.calls == [pretend.call(IRepositoryService)] + assert fake_irepository.bump_bin_n_roles.calls == [pretend.call()] class TestInitRepository: @@ -97,6 +99,7 @@ def test_success(self, db_request): task = pretend.stub() tasks.init_repository(task, db_request) + assert fake_irepository.init_repository.calls == [pretend.call()] assert db_request.find_service.calls == [pretend.call(IRepositoryService)] @@ -130,6 +133,7 @@ def __exit__(self, type, value, traceback): task = pretend.stub() tasks.init_targets_delegation(task, db_request) + assert fake_irepository.init_targets_delegation.calls == [pretend.call()] assert db_request.find_service.calls == [pretend.call(IRepositoryService)] @@ -167,4 +171,5 @@ def __exit__(self, type, value, traceback): task = pretend.stub() tasks.add_hashed_targets(task, db_request, targets) + fake_irepository.add_hashed_targets.calls == [pretend.call()] assert db_request.find_service.calls == [pretend.call(IRepositoryService)] diff --git a/warehouse/cli/tuf.py b/warehouse/cli/tuf.py index 071224fd8be6..3f32b22d89ee 100644 --- a/warehouse/cli/tuf.py +++ b/warehouse/cli/tuf.py @@ -12,8 +12,10 @@ import click -from securesystemslib.exceptions import StorageError -from securesystemslib.interface import generate_and_write_ed25519_keypair +from securesystemslib.exceptions import StorageError # type: ignore +from securesystemslib.interface import ( # type: ignore + generate_and_write_ed25519_keypair, +) from warehouse.cli import warehouse from warehouse.packaging.utils import render_simple_detail diff --git a/warehouse/tuf/repository.py b/warehouse/tuf/repository.py index 4cd7ecfeda02..f6a5844d6e4a 100644 --- a/warehouse/tuf/repository.py +++ b/warehouse/tuf/repository.py @@ -14,8 +14,8 @@ from datetime import datetime from typing import Any, Dict, List, Optional -from securesystemslib.exceptions import StorageError -from securesystemslib.signer import SSlibSigner +from securesystemslib.exceptions import StorageError # type: ignore +from securesystemslib.signer import SSlibSigner # type: ignore from tuf.api.metadata import ( SPECIFICATION_VERSION, TOP_LEVEL_ROLE_NAMES, @@ -55,9 +55,9 @@ class RolesPayload: expiration: datetime threshold: int keys: List[Dict[str, Any]] - delegation_role: str = None - paths: List[str] = None - path_hash_prefixes: List[str] = None + delegation_role: Optional[str] = None + paths: Optional[List[str]] = None + path_hash_prefixes: Optional[List[str]] = None @dataclass @@ -66,7 +66,7 @@ class TargetsPayload: Container for target files info, suitable for targets metadata. """ - fileinfo: str + fileinfo: Dict[str, Any] path: str @@ -110,6 +110,8 @@ def _create_delegated_targets_roles( for role_parameter in delegate_role_parameters: rolename = role_parameter.delegation_role + if rolename is None: + raise ValueError("A delegation role name is required.") try: if self.load_role(rolename): raise FileExistsError(f"Role {rolename} already exists.") @@ -120,7 +122,7 @@ def _create_delegated_targets_roles( name=rolename, keyids=[key["keyid"] for key in role_parameter.keys], threshold=role_parameter.threshold, - terminating=None, + terminating=False, paths=role_parameter.paths, path_hash_prefixes=role_parameter.path_hash_prefixes, ) @@ -196,7 +198,7 @@ def initialize( Dictionary of role names as keys and metadata objects as values. ``Dict[str, Metadata]`` """ - top_level_roles_metadata = dict() + top_level_roles_metadata: Dict[str, Any] = dict() if self.is_initialized: raise FileExistsError("Metadata already exists in the Storage Service") diff --git a/warehouse/tuf/services.py b/warehouse/tuf/services.py index cb8ce967df6a..a7558d7a0bce 100644 --- a/warehouse/tuf/services.py +++ b/warehouse/tuf/services.py @@ -19,8 +19,10 @@ from contextlib import contextmanager -from securesystemslib.exceptions import StorageError -from securesystemslib.interface import import_ed25519_privatekey_from_file +from securesystemslib.exceptions import StorageError # type: ignore +from securesystemslib.interface import ( # type: ignore + import_ed25519_privatekey_from_file, +) from zope.interface import implementer from warehouse.config import Environment