Skip to content

Commit

Permalink
fixed typing and tests improvement
Browse files Browse the repository at this point in the history
As ``warehouse.tuf.repository`` is using typing, the mypy found some
issues and it was fixed.
Some tests improvements, added some monkeypatch and new asserts for call
recorded using pretend.

Signed-off-by: Kairo de Araujo <kdearaujo@vmware.com>
  • Loading branch information
Kairo de Araujo committed Mar 31, 2022
1 parent 5ae58c6 commit 6dc98c8
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 28 deletions.
31 changes: 29 additions & 2 deletions tests/unit/tuf/test_repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ def test__create_delegated_targets_roles_raises_storageerror(
]
assert tuf_repository._store.calls[0].args[0] == "test_bin"

def test__create_delegated_targets_roles_raise_fileexists(
self, tuf_repository, monkeypatch
def test__create_delegated_targets_roles_missing_delegated_role(
self, tuf_repository
):
fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
fake_targets_md = pretend.stub(
Expand All @@ -259,6 +259,33 @@ def test__create_delegated_targets_roles_raise_fileexists(
)
fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={}))

test_delegate_roles_parameters = [
repository.RolesPayload(
expiration=fake_time,
threshold=1,
keys=[{"keyid": "key1"}, {"keyid": "key2"}],
paths=["*/*"],
)
]

with pytest.raises(ValueError) as err:
tuf_repository._create_delegated_targets_roles(
delegator_metadata=fake_targets_md,
delegate_role_parameters=test_delegate_roles_parameters,
snapshot_metadata=fake_snapshot_md,
)

assert "A delegation role name is required." in str(err.value)

def test__create_delegated_targets_roles_raise_fileexists(self, tuf_repository):
fake_time = datetime.datetime(2019, 6, 16, 9, 5, 1)
fake_targets_md = pretend.stub(
signed=pretend.stub(
delegations=None, add_key=pretend.call_recorder(lambda *a, **kw: None)
)
)
fake_snapshot_md = pretend.stub(signed=pretend.stub(meta={}))

tuf_repository.load_role = pretend.call_recorder(
lambda role: fake_snapshot_md if role == Snapshot.type else True
)
Expand Down
32 changes: 18 additions & 14 deletions tests/unit/tuf/test_services.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ def test_get(self, db_request, monkeypatch):
"keyid_hash_algorithms": ["sha256", "sha512"],
}
db_request.registry.settings["tuf.root.secret"] = "tuf.root.secret"
monkeypatch.setattr(glob, "glob", lambda privkey_path: ["fake_root.key"])
monkeypatch.setattr(
"warehouse.tuf.services.import_ed25519_privatekey_from_file",
lambda *a, **kw: expected_priv_key_dict,
Expand Down Expand Up @@ -84,8 +85,6 @@ def test_get(self, monkeypatch):
monkeypatch.setattr(glob, "glob", lambda *a, **kw: ["1.root.json"])

fake_file_object = pretend.stub(
__enter__=None,
__exit__=None,
close=pretend.call_recorder(lambda: None),
read=pretend.call_recorder(lambda: b"fake_root_data"),
)
Expand All @@ -96,7 +95,7 @@ def test_get(self, monkeypatch):
with service.get("root") as r:
result = r.read()

assert result == b"fake_root_data"
assert result == fake_file_object.read()
assert fake_file_object.close.calls == [pretend.call()]

def test_get_max_version_raises_valueerror(self, monkeypatch):
Expand All @@ -105,8 +104,6 @@ def test_get_max_version_raises_valueerror(self, monkeypatch):
monkeypatch.setattr(glob, "glob", lambda *a, **kw: [])

fake_file_object = pretend.stub(
__enter__=None,
__exit__=None,
close=pretend.call_recorder(lambda: None),
read=pretend.call_recorder(lambda: b"fake_root_data"),
)
Expand All @@ -117,7 +114,7 @@ def test_get_max_version_raises_valueerror(self, monkeypatch):
with service.get("root") as r:
result = r.read()

assert result == b"fake_root_data"
assert result == fake_file_object.read()
assert fake_file_object.close.calls == [pretend.call()]

def test_get_oserror(self, monkeypatch):
Expand All @@ -144,8 +141,6 @@ def test_get_specific_version(self, monkeypatch):
)

fake_file_object = pretend.stub(
__enter__=None,
__exit__=None,
close=pretend.call_recorder(lambda: None),
read=pretend.call_recorder(lambda: b"fake_data"),
)
Expand All @@ -156,7 +151,7 @@ def test_get_specific_version(self, monkeypatch):
with service.get("root", version=2) as r:
result = r.read()

assert result == b"fake_data"
assert result == fake_file_object.read()
assert fake_file_object.close.calls == [pretend.call()]

def test_get_timestamp_specific(self, monkeypatch):
Expand All @@ -165,8 +160,6 @@ def test_get_timestamp_specific(self, monkeypatch):
monkeypatch.setattr(glob, "glob", lambda *a, **kw: ["timestamp.json"])

fake_file_object = pretend.stub(
__enter__=None,
__exit__=None,
close=pretend.call_recorder(lambda: None),
read=pretend.call_recorder(lambda: b"fake_data"),
)
Expand All @@ -177,7 +170,7 @@ def test_get_timestamp_specific(self, monkeypatch):
with service.get("timestamp") as r:
result = r.read()

assert result == b"fake_data"
assert result == fake_file_object.read()

def test_put(self, monkeypatch):
service = services.LocalStorageService("/opt/warehouse/src/dev/metadata")
Expand Down Expand Up @@ -312,6 +305,10 @@ def test_create_service(self):
assert service._storage_backend == fake_service
assert service._key_storage_backend == fake_service
assert service._request == request
assert request.find_service.calls == [
pretend.call(IStorageService),
pretend.call(IKeyService),
]

def test_basic_init(self):
service = services.RepositoryService(
Expand Down Expand Up @@ -451,8 +448,15 @@ def test_init_repository(self, db_request, monkeypatch):
service = services.RepositoryService(fake_storage, fake_key_storage, db_request)
result = service.init_repository()

call_args = fake_metadata_repository.initialize.calls[0].args[0]
assert result is None
# one call for role (4)
assert fake_datetime.now.calls == [
pretend.call(),
pretend.call(),
pretend.call(),
pretend.call(),
]
call_args = fake_metadata_repository.initialize.calls[0].args[0]
assert str(call_args["snapshot"].expiration) == "2019-06-17 09:05:01"
assert str(call_args["timestamp"].expiration) == "2019-06-17 09:05:01"
assert str(call_args["root"].expiration) == "2020-06-15 09:05:01"
Expand Down Expand Up @@ -549,9 +553,9 @@ def test_init_targets_delegation(self, db_request, monkeypatch):
service = services.RepositoryService(fake_storage, fake_key_storage, db_request)
service.bump_snapshot = pretend.call_recorder(lambda snapshot_metadata: None)
result = service.init_targets_delegation()
call_args = fake_metadata_repository.delegate_targets_roles.calls[0].args[0]

assert result is None
call_args = fake_metadata_repository.delegate_targets_roles.calls[0].args[0]
assert sorted(["targets", "bins"]) == sorted(list(call_args.keys()))
assert len(call_args["targets"]) == 1
assert call_args["targets"][0].paths == ["*/*", "*/*/*/*"]
Expand Down
5 changes: 5 additions & 0 deletions tests/unit/tuf/test_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def __exit__(self, type, value, traceback):
tasks.bump_snapshot(task, db_request)

assert db_request.find_service.calls == [pretend.call(IRepositoryService)]
assert fake_irepository.bump_snapshot.calls == [pretend.call()]


class TestBumpBinNRoles:
Expand Down Expand Up @@ -81,6 +82,7 @@ def __exit__(self, type, value, traceback):
tasks.bump_bin_n_roles(task, db_request)

assert db_request.find_service.calls == [pretend.call(IRepositoryService)]
assert fake_irepository.bump_bin_n_roles.calls == [pretend.call()]


class TestInitRepository:
Expand All @@ -97,6 +99,7 @@ def test_success(self, db_request):
task = pretend.stub()
tasks.init_repository(task, db_request)

assert fake_irepository.init_repository.calls == [pretend.call()]
assert db_request.find_service.calls == [pretend.call(IRepositoryService)]


Expand Down Expand Up @@ -130,6 +133,7 @@ def __exit__(self, type, value, traceback):
task = pretend.stub()
tasks.init_targets_delegation(task, db_request)

assert fake_irepository.init_targets_delegation.calls == [pretend.call()]
assert db_request.find_service.calls == [pretend.call(IRepositoryService)]


Expand Down Expand Up @@ -167,4 +171,5 @@ def __exit__(self, type, value, traceback):
task = pretend.stub()
tasks.add_hashed_targets(task, db_request, targets)

fake_irepository.add_hashed_targets.calls == [pretend.call()]
assert db_request.find_service.calls == [pretend.call(IRepositoryService)]
6 changes: 4 additions & 2 deletions warehouse/cli/tuf.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@

import click

from securesystemslib.exceptions import StorageError
from securesystemslib.interface import generate_and_write_ed25519_keypair
from securesystemslib.exceptions import StorageError # type: ignore
from securesystemslib.interface import ( # type: ignore
generate_and_write_ed25519_keypair,
)

from warehouse.cli import warehouse
from warehouse.packaging.utils import render_simple_detail
Expand Down
18 changes: 10 additions & 8 deletions warehouse/tuf/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
from datetime import datetime
from typing import Any, Dict, List, Optional

from securesystemslib.exceptions import StorageError
from securesystemslib.signer import SSlibSigner
from securesystemslib.exceptions import StorageError # type: ignore
from securesystemslib.signer import SSlibSigner # type: ignore
from tuf.api.metadata import (
SPECIFICATION_VERSION,
TOP_LEVEL_ROLE_NAMES,
Expand Down Expand Up @@ -55,9 +55,9 @@ class RolesPayload:
expiration: datetime
threshold: int
keys: List[Dict[str, Any]]
delegation_role: str = None
paths: List[str] = None
path_hash_prefixes: List[str] = None
delegation_role: Optional[str] = None
paths: Optional[List[str]] = None
path_hash_prefixes: Optional[List[str]] = None


@dataclass
Expand All @@ -66,7 +66,7 @@ class TargetsPayload:
Container for target files info, suitable for targets metadata.
"""

fileinfo: str
fileinfo: Dict[str, Any]
path: str


Expand Down Expand Up @@ -110,6 +110,8 @@ def _create_delegated_targets_roles(

for role_parameter in delegate_role_parameters:
rolename = role_parameter.delegation_role
if rolename is None:
raise ValueError("A delegation role name is required.")
try:
if self.load_role(rolename):
raise FileExistsError(f"Role {rolename} already exists.")
Expand All @@ -120,7 +122,7 @@ def _create_delegated_targets_roles(
name=rolename,
keyids=[key["keyid"] for key in role_parameter.keys],
threshold=role_parameter.threshold,
terminating=None,
terminating=False,
paths=role_parameter.paths,
path_hash_prefixes=role_parameter.path_hash_prefixes,
)
Expand Down Expand Up @@ -196,7 +198,7 @@ def initialize(
Dictionary of role names as keys and metadata objects as values.
``Dict[str, Metadata]``
"""
top_level_roles_metadata = dict()
top_level_roles_metadata: Dict[str, Any] = dict()
if self.is_initialized:
raise FileExistsError("Metadata already exists in the Storage Service")

Expand Down
6 changes: 4 additions & 2 deletions warehouse/tuf/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

from contextlib import contextmanager

from securesystemslib.exceptions import StorageError
from securesystemslib.interface import import_ed25519_privatekey_from_file
from securesystemslib.exceptions import StorageError # type: ignore
from securesystemslib.interface import ( # type: ignore
import_ed25519_privatekey_from_file,
)
from zope.interface import implementer

from warehouse.config import Environment
Expand Down

0 comments on commit 6dc98c8

Please sign in to comment.