diff --git a/collectors/nvd/collectors.py b/collectors/nvd/collectors.py index 298e446a2..0461a5728 100644 --- a/collectors/nvd/collectors.py +++ b/collectors/nvd/collectors.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import List, Union import nvdlib from celery.utils.log import get_task_logger @@ -65,6 +65,17 @@ def response2result(self, vulnerabilities: list) -> list: filtering out everything unnecessary and simplifying """ + def get_cpe_list(data: CVE) -> List[str]: + """ + Return a list of CPEs from the CVE `data` + """ + cpe_list = [] + if hasattr(data, "cpe"): + for entry in data.cpe: + cpe_list.append(entry.criteria) + + return cpe_list + def get_cvss_metric(data: CVE, version: str) -> Union[dict, None]: """ Return CVSS metric from `data` for the given `version`. @@ -104,6 +115,7 @@ def get_cvss_metric(data: CVE, version: str) -> Union[dict, None]: ], ) ), + "nvd_cpes": get_cpe_list(vulnerability), } ) diff --git a/collectors/nvd/tests/test_collectors.py b/collectors/nvd/tests/test_collectors.py index ef5ff9f55..be4a102a7 100644 --- a/collectors/nvd/tests/test_collectors.py +++ b/collectors/nvd/tests/test_collectors.py @@ -236,6 +236,20 @@ def test_cvss4(self): flaw = Flaw.objects.get(cve_id=cve_id) assert flaw.cvss_scores.filter(version=FlawCVSS.CVSSVersion.VERSION4) + @pytest.mark.vcr + def test_cpe_load(self): + """ + Test that CPE values are correctly loaded in the Flaw model. + """ + cve_id = "CVE-2020-1234" + FlawFactory(cve_id=cve_id) + + nvdc = NVDCollector() + nvdc.collect(cve_id) + + flaw = Flaw.objects.get(cve_id=cve_id) + assert len(flaw.nvd_cpes) > 0 + @pytest.mark.parametrize( "old_flag,new_flag", [ @@ -257,6 +271,7 @@ def test_cvss4(self): ), ], ) + def test_reset_flag_on_removal(self, old_flag, new_flag): """ test that NIST CVSS validation flag is correctly adjusted when NVD CVSSv3 is removed