Skip to content

Commit

Permalink
Refactoring progress #95
Browse files Browse the repository at this point in the history
Signed-off-by: tdruez <tdruez@nexb.com>
  • Loading branch information
tdruez committed Sep 4, 2024
1 parent ec4f5d4 commit 2ad391b
Show file tree
Hide file tree
Showing 15 changed files with 417 additions and 353 deletions.
82 changes: 0 additions & 82 deletions component_catalog/filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@

from django import forms
from django.contrib.admin.options import IncorrectLookupParameters
from django.db.models import F
from django.utils.functional import cached_property
from django.utils.translation import gettext_lazy as _

Expand All @@ -23,12 +22,10 @@
from dje.filters import HasRelationFilter
from dje.filters import MatchOrderedSearchFilter
from dje.filters import RelatedLookupListFilter
from dje.filters import SearchFilter
from dje.widgets import BootstrapSelectMultipleWidget
from dje.widgets import DropDownRightWidget
from dje.widgets import SortDropDownWidget
from license_library.models import License
from vulnerabilities.models import Vulnerability


class IsVulnerableFilter(HasRelationFilter):
Expand Down Expand Up @@ -283,82 +280,3 @@ def show_created_date(self):
@cached_property
def show_last_modified_date(self):
return not self.sort_value or self.has_sort_by("last_modified_date")


class NullsLastOrderingFilter(django_filters.OrderingFilter):
"""
A custom ordering filter that ensures null values are sorted last.
When sorting by fields with potential null values, this filter modifies the
ordering to use Django's `nulls_last` clause for better handling of null values,
whether in ascending or descending order.
"""

def filter(self, qs, value):
if not value:
return qs

ordering = []
for field in value:
if field.startswith("-"):
field_name = field[1:]
ordering.append(F(field_name).desc(nulls_last=True))
else:
ordering.append(F(field).asc(nulls_last=True))

return qs.order_by(*ordering)


vulnerability_score_ranges = {
"low": (0.1, 3),
"medium": (4.0, 6.9),
"high": (7.0, 8.9),
"critical": (9.0, 10.0),
}

SCORE_CHOICES = [
(key, f"{key.capitalize()} ({value[0]} - {value[1]})")
for key, value in vulnerability_score_ranges.items()
]


class VulnerabilityFilterSet(DataspacedFilterSet):
q = SearchFilter(
label=_("Search"),
search_fields=["vulnerability_id", "aliases"],
)
sort = NullsLastOrderingFilter(
label=_("Sort"),
fields=[
"max_score",
"min_score",
"affected_products_count",
"affected_packages_count",
"fixed_packages_count",
"created_date",
"last_modified_date",
],
widget=SortDropDownWidget,
)
max_score = django_filters.ChoiceFilter(
choices=SCORE_CHOICES,
method="filter_by_score_range",
label="Score Range",
help_text="Select a score range to filter.",
)

class Meta:
model = Vulnerability
fields = [
"q",
]

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.filters["max_score"].extra["widget"] = DropDownRightWidget(anchor=self.anchor)

def filter_by_score_range(self, queryset, name, value):
if value in vulnerability_score_ranges:
low, high = vulnerability_score_ranges[value]
return queryset.filter(max_score__gte=low, max_score__lte=high)
return queryset
16 changes: 3 additions & 13 deletions component_catalog/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from policy.models import SetPolicyFromLicenseMixin
from policy.models import UsagePolicyMixin
from vulnerabilities.models import AffectedByVulnerabilityMixin
from vulnerabilities.models import AffectedByVulnerabilityRelationship
from workflow.models import RequestMixin

logger = logging.getLogger("dje")
Expand Down Expand Up @@ -2502,33 +2503,22 @@ def __str__(self):
return f"{self.package} is under {self.license}."


# TODO: Review cascade
class PackageAffectedByVulnerability(DataspacedModel):
class PackageAffectedByVulnerability(AffectedByVulnerabilityRelationship):
package = models.ForeignKey(
to="component_catalog.Package",
on_delete=models.CASCADE,
)

vulnerability = models.ForeignKey(
to="vulnerabilities.Vulnerability",
on_delete=models.PROTECT,
)

class Meta:
unique_together = (("package", "vulnerability"), ("dataspace", "uuid"))


class ComponentAffectedByVulnerability(DataspacedModel):
class ComponentAffectedByVulnerability(AffectedByVulnerabilityRelationship):
component = models.ForeignKey(
to="component_catalog.Component",
on_delete=models.CASCADE,
)

vulnerability = models.ForeignKey(
to="vulnerabilities.Vulnerability",
on_delete=models.PROTECT,
)

class Meta:
unique_together = (("component", "vulnerability"), ("dataspace", "uuid"))

Expand Down
2 changes: 1 addition & 1 deletion component_catalog/tests/test_admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,7 @@ def test_package_changeform_filename_validation(self):
self.assertEqual(200, response.status_code)
self.assertEqual(errors, response.context_data["adminform"].form.errors)

@mock.patch("component_catalog.models.VulnerabilityMixin.fetch_vulnerabilities")
@mock.patch("vulnerabilities.models.AffectedByVulnerabilityMixin.fetch_vulnerabilities")
def test_package_changeform_fetch_vulnerabilities(self, mock_fetch_vulnerabilities):
mock_fetch_vulnerabilities.return_value = None
self.dataspace1.enable_vulnerablecodedb_access = True
Expand Down
2 changes: 1 addition & 1 deletion component_catalog/tests/test_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

from component_catalog.filters import ComponentFilterSet
from component_catalog.filters import PackageFilterSet
from component_catalog.filters import VulnerabilityFilterSet
from component_catalog.models import Component
from component_catalog.models import ComponentKeyword
from component_catalog.models import ComponentType
Expand All @@ -26,6 +25,7 @@
from license_library.models import License
from organization.models import Owner
from policy.models import UsagePolicy
from vulnerabilities.filters import VulnerabilityFilterSet
from vulnerabilities.tests import make_vulnerability


Expand Down
179 changes: 0 additions & 179 deletions component_catalog/tests/test_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,10 @@
from component_catalog.models import Package
from component_catalog.models import PackageAlreadyExistsWarning
from component_catalog.models import Subcomponent
from component_catalog.tests import make_component
from component_catalog.tests import make_package
from dejacode_toolkit import download
from dejacode_toolkit.download import DataCollectionException
from dejacode_toolkit.download import collect_package_data
from dejacode_toolkit.vulnerablecode import VulnerableCode
from dje.copier import copy_object
from dje.models import Dataspace
from dje.models import History
Expand All @@ -57,8 +55,6 @@
from license_library.models import LicenseTag
from organization.models import Owner
from product_portfolio.tests import make_product
from vulnerabilities.models import Vulnerability
from vulnerabilities.tests import make_vulnerability


class ComponentCatalogModelsTestCase(TestCase):
Expand Down Expand Up @@ -2585,178 +2581,3 @@ def test_vulnerability_mixin_is_vulnerable_property(self):
package2 = make_package(self.dataspace)
self.assertTrue(package1.is_vulnerable)
self.assertFalse(package2.is_vulnerable)

@mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.request_get")
def test_vulnerability_mixin_get_entry_for_package(self, mock_request_get):
vulnerablecode = VulnerableCode(self.dataspace)
package1 = make_package(self.dataspace, package_url="pkg:composer/guzzlehttp/psr7@1.9.0")
response_file = self.data / "vulnerabilities" / "idna_3.6_response.json"
mock_request_get.return_value = json.loads(response_file.read_text())

affected_by_vulnerabilities = package1.get_entry_for_package(vulnerablecode)
self.assertEqual(1, len(affected_by_vulnerabilities))
self.assertEqual("VCID-j3au-usaz-aaag", affected_by_vulnerabilities[0]["vulnerability_id"])

@mock.patch("component_catalog.models.VulnerabilityMixin.get_entry_for_package")
@mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.is_configured")
def test_vulnerability_mixin_get_entry_from_vulnerablecode(
self, mock_is_configured, mock_get_entry_for_package
):
package1 = make_package(self.dataspace, package_url="pkg:pypi/idna@3.6")
self.assertIsNone(package1.get_entry_from_vulnerablecode())

mock_get_entry_for_package.return_value = None
self.dataspace.enable_vulnerablecodedb_access = True
self.dataspace.save()
mock_is_configured.return_value = True
package1.get_entry_from_vulnerablecode()
mock_get_entry_for_package.assert_called_once()

@mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.request_get")
@mock.patch("dejacode_toolkit.vulnerablecode.VulnerableCode.is_configured")
def test_vulnerability_mixin_fetch_vulnerabilities(self, mock_is_configured, mock_request_get):
mock_is_configured.return_value = True
self.dataspace.enable_vulnerablecodedb_access = True
self.dataspace.save()
response_file = self.data / "vulnerabilities" / "idna_3.6_response.json"
mock_request_get.return_value = json.loads(response_file.read_text())

package1 = make_package(self.dataspace, package_url="pkg:pypi/idna@3.6")
package1.fetch_vulnerabilities()

self.assertEqual(1, Vulnerability.objects.scope(self.dataspace).count())
self.assertEqual(1, package1.affected_by_vulnerabilities.count())
vulnerability = package1.affected_by_vulnerabilities.get()
self.assertEqual("VCID-j3au-usaz-aaag", vulnerability.vulnerability_id)

def test_vulnerability_mixin_create_vulnerabilities(self):
response_file = self.data / "vulnerabilities" / "idna_3.6_response.json"
response_json = json.loads(response_file.read_text())
vulnerabilities_data = response_json["results"][0]["affected_by_vulnerabilities"]

package1 = make_package(self.dataspace, package_url="pkg:pypi/idna@3.6")
package1.create_vulnerabilities(vulnerabilities_data)

self.assertEqual(1, Vulnerability.objects.scope(self.dataspace).count())
self.assertEqual(1, package1.affected_by_vulnerabilities.count())
vulnerability = package1.affected_by_vulnerabilities.get()
self.assertEqual("VCID-j3au-usaz-aaag", vulnerability.vulnerability_id)

def test_vulnerability_model_affected_packages_m2m(self):
package1 = make_package(self.dataspace)
vulnerablity1 = make_vulnerability(dataspace=self.dataspace, affecting=package1)
self.assertEqual(package1, vulnerablity1.affected_packages.get())
self.assertEqual(vulnerablity1, package1.affected_by_vulnerabilities.get())

def test_vulnerability_model_add_affected(self):
vulnerablity1 = make_vulnerability(dataspace=self.dataspace)
package1 = make_package(self.dataspace)
package2 = make_package(self.dataspace)
vulnerablity1.add_affected(package1)
vulnerablity1.add_affected([package2])
self.assertEqual(2, vulnerablity1.affected_packages.count())

vulnerablity2 = make_vulnerability(dataspace=self.dataspace)
component1 = make_component(self.dataspace)
vulnerablity2.add_affected([component1, package1])
self.assertQuerySetEqual(vulnerablity2.affected_packages.all(), [package1])
self.assertQuerySetEqual(vulnerablity2.affected_components.all(), [component1])

def test_vulnerability_model_fixed_packages_count_generated_field(self):
vulnerablity1 = make_vulnerability(dataspace=self.dataspace)
self.assertEqual(0, vulnerablity1.fixed_packages_count)

vulnerablity1.fixed_packages = [
{"purl": "pkg:pypi/gitpython@3.1.41", "is_vulnerable": True},
{"purl": "pkg:pypi/gitpython@3.2", "is_vulnerable": False},
]
vulnerablity1.save()
vulnerablity1.refresh_from_db()
self.assertEqual(2, vulnerablity1.fixed_packages_count)

def test_vulnerability_model_create_from_data(self):
package1 = make_package(self.dataspace)
vulnerability_data = {
"vulnerability_id": "VCID-q4q6-yfng-aaag",
"summary": "In Django 3.2 before 3.2.25, 4.2 before 4.2.11, and 5.0.",
"aliases": ["CVE-2024-27351", "GHSA-vm8q-m57g-pff3", "PYSEC-2024-47"],
"references": [
{
"reference_url": "https://access.redhat.com/hydra/rest/"
"securitydata/cve/CVE-2024-27351.json",
"reference_id": "",
"scores": [
{
"value": "7.5",
"scoring_system": "cvssv3",
"scoring_elements": "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:N/I:N/A:H",
}
],
},
],
}

vulnerability1 = Vulnerability.create_from_data(
dataspace=self.dataspace,
data=vulnerability_data,
affecting=package1,
)
self.assertEqual(vulnerability_data["vulnerability_id"], vulnerability1.vulnerability_id)
self.assertEqual(vulnerability_data["summary"], vulnerability1.summary)
self.assertEqual(vulnerability_data["aliases"], vulnerability1.aliases)
self.assertEqual(vulnerability_data["references"], vulnerability1.references)
self.assertEqual(7.5, vulnerability1.min_score)
self.assertEqual(7.5, vulnerability1.max_score)
self.assertQuerySetEqual(vulnerability1.affected_packages.all(), [package1])

def test_vulnerability_model_create_from_data_computed_scores(self):
response_file = self.data / "vulnerabilities" / "idna_3.6_response.json"
json_data = json.loads(response_file.read_text())
affected_by_vulnerabilities = json_data["results"][0]["affected_by_vulnerabilities"]
vulnerability1 = Vulnerability.create_from_data(
dataspace=self.dataspace,
data=affected_by_vulnerabilities[0],
)
self.assertEqual(2.1, vulnerability1.min_score)
self.assertEqual(7.5, vulnerability1.max_score)

def test_vulnerability_model_queryset_count_methods(self):
package1 = make_package(self.dataspace)
package2 = make_package(self.dataspace)
vulnerablity1 = make_vulnerability(dataspace=self.dataspace)
vulnerablity1.add_affected([package1, package2])
make_product(self.dataspace, inventory=[package1, package2])

qs = (
Vulnerability.objects.scope(self.dataspace)
.with_affected_products_count()
.with_affected_packages_count()
)
self.assertEqual(2, qs[0].affected_packages_count)
self.assertEqual(1, qs[0].affected_products_count)

def test_vulnerability_model_as_cyclonedx(self):
response_file = self.data / "vulnerabilities" / "idna_3.6_response.json"
json_data = json.loads(response_file.read_text())
affected_by_vulnerabilities = json_data["results"][0]["affected_by_vulnerabilities"]
vulnerability1 = Vulnerability.create_from_data(
dataspace=self.dataspace,
data=affected_by_vulnerabilities[0],
)
package1 = make_package(
self.dataspace,
package_url="pkg:type/name@1.9.0",
uuid="dd0afd00-89bd-46d6-b1f0-57b553c44d32",
)

vulnerability1_as_cdx = vulnerability1.as_cyclonedx(affected_instances=[package1])
as_dict = json.loads(vulnerability1_as_cdx.as_json())
as_dict.pop("ratings", None) # The sorting is inconsistent
results = json.dumps(as_dict, indent=2)

expected_location = self.data / "vulnerabilities" / "idna_3.6_as_cyclonedx.json"
# Uncomment to regen the expected results
# if True:
# expected_location.write_text(results)

self.assertJSONEqual(results, expected_location.read_text())
Loading

0 comments on commit 2ad391b

Please sign in to comment.