From 7c1734f0f62f0b3369795d5f83868d734f73fb17 Mon Sep 17 00:00:00 2001 From: Jon Moroney Date: Thu, 23 Apr 2026 11:31:24 -0700 Subject: [PATCH 1/6] Add helper function to pull cpe list Adds a helper function to extract nvd cpe data and adds it to the list of data returned Partial implementation for OSIDB-4923 --- collectors/nvd/collectors.py | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/collectors/nvd/collectors.py b/collectors/nvd/collectors.py index 298e446a2..3754db491 100644 --- a/collectors/nvd/collectors.py +++ b/collectors/nvd/collectors.py @@ -1,4 +1,4 @@ -from typing import Union +from typing import List, Optional, Union import nvdlib from celery.utils.log import get_task_logger @@ -65,6 +65,20 @@ def response2result(self, vulnerabilities: list) -> list: filtering out everything unnecessary and simplifying """ + def get_cpe_list(data: CVE) -> Optional[List[str]]: + """ + Return a list of CPEs from the CVE `data` + """ + cpe_list = [] + if "cpe" in data and len(data.cpe) > 0: + for entry in data.cpe: + cpe_list.append(entry.criteria) + + if len(cpe_list) > 0: + return cpe_list + else: + return None + def get_cvss_metric(data: CVE, version: str) -> Union[dict, None]: """ Return CVSS metric from `data` for the given `version`. @@ -104,6 +118,7 @@ def get_cvss_metric(data: CVE, version: str) -> Union[dict, None]: ], ) ), + "nvd_cpes": get_cpe_list(vulnerability), } ) From bb2d409453e198c5b09c1ee793a479223507154c Mon Sep 17 00:00:00 2001 From: Jon Moroney Date: Mon, 27 Apr 2026 11:19:05 -0700 Subject: [PATCH 2/6] Return an empty list instead of a None value --- collectors/nvd/collectors.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/collectors/nvd/collectors.py b/collectors/nvd/collectors.py index 3754db491..f9c1823b9 100644 --- a/collectors/nvd/collectors.py +++ b/collectors/nvd/collectors.py @@ -74,10 +74,7 @@ def get_cpe_list(data: CVE) -> Optional[List[str]]: for entry in data.cpe: cpe_list.append(entry.criteria) - if len(cpe_list) > 0: - return cpe_list - else: - return None + return cpe_list def get_cvss_metric(data: CVE, version: str) -> Union[dict, None]: """ From fa056a019a3a98467e2e9bde8b406b3db4236739 Mon Sep 17 00:00:00 2001 From: Jon Moroney Date: Mon, 27 Apr 2026 11:20:59 -0700 Subject: [PATCH 3/6] reduced cpe field check --- collectors/nvd/collectors.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/collectors/nvd/collectors.py b/collectors/nvd/collectors.py index f9c1823b9..650698f6e 100644 --- a/collectors/nvd/collectors.py +++ b/collectors/nvd/collectors.py @@ -70,7 +70,7 @@ def get_cpe_list(data: CVE) -> Optional[List[str]]: Return a list of CPEs from the CVE `data` """ cpe_list = [] - if "cpe" in data and len(data.cpe) > 0: + if hasattr(data, "cpe"): for entry in data.cpe: cpe_list.append(entry.criteria) From 6bb043b49e99feb2c7192bed2b8ae43d292f5d8d Mon Sep 17 00:00:00 2001 From: Jon Moroney Date: Mon, 27 Apr 2026 12:17:49 -0700 Subject: [PATCH 4/6] Remove optional value --- collectors/nvd/collectors.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/collectors/nvd/collectors.py b/collectors/nvd/collectors.py index 650698f6e..0461a5728 100644 --- a/collectors/nvd/collectors.py +++ b/collectors/nvd/collectors.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Union +from typing import List, Union import nvdlib from celery.utils.log import get_task_logger @@ -65,7 +65,7 @@ def response2result(self, vulnerabilities: list) -> list: filtering out everything unnecessary and simplifying """ - def get_cpe_list(data: CVE) -> Optional[List[str]]: + def get_cpe_list(data: CVE) -> List[str]: """ Return a list of CPEs from the CVE `data` """ From a36d0347ced73e8ed5a17938eb051c37fb96dc5f Mon Sep 17 00:00:00 2001 From: Jon Moroney Date: Mon, 27 Apr 2026 12:29:42 -0700 Subject: [PATCH 5/6] Stub test. Need to work test on another machine --- collectors/nvd/tests/test_collectors.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/collectors/nvd/tests/test_collectors.py b/collectors/nvd/tests/test_collectors.py index ef5ff9f55..ade9e6d31 100644 --- a/collectors/nvd/tests/test_collectors.py +++ b/collectors/nvd/tests/test_collectors.py @@ -257,6 +257,21 @@ def test_cvss4(self): ), ], ) + @pytest.mark.vcr + def test_cpe_load(self): + """ + Test that CPE values are correctly loaded in the Flaw model. + """ + cve_id = "CVE-2020-1234" + FlawFactory(cve_id=cve_id) + + nvdc = NVDCollector() + nvdc.collect(cve_id) + + flaw = Flaw.objects.get(cve_id=cve_id) + assert hasattr(flaw, "cpe") + assert False + def test_reset_flag_on_removal(self, old_flag, new_flag): """ test that NIST CVSS validation flag is correctly adjusted when NVD CVSSv3 is removed From 9169a8279584980dbf54c365a7488bb74f317bcc Mon Sep 17 00:00:00 2001 From: Jon Moroney Date: Mon, 27 Apr 2026 12:42:33 -0700 Subject: [PATCH 6/6] Move test and look for length for the moment --- collectors/nvd/tests/test_collectors.py | 28 ++++++++++++------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/collectors/nvd/tests/test_collectors.py b/collectors/nvd/tests/test_collectors.py index ade9e6d31..be4a102a7 100644 --- a/collectors/nvd/tests/test_collectors.py +++ b/collectors/nvd/tests/test_collectors.py @@ -236,6 +236,20 @@ def test_cvss4(self): flaw = Flaw.objects.get(cve_id=cve_id) assert flaw.cvss_scores.filter(version=FlawCVSS.CVSSVersion.VERSION4) + @pytest.mark.vcr + def test_cpe_load(self): + """ + Test that CPE values are correctly loaded in the Flaw model. + """ + cve_id = "CVE-2020-1234" + FlawFactory(cve_id=cve_id) + + nvdc = NVDCollector() + nvdc.collect(cve_id) + + flaw = Flaw.objects.get(cve_id=cve_id) + assert len(flaw.nvd_cpes) > 0 + @pytest.mark.parametrize( "old_flag,new_flag", [ @@ -257,20 +271,6 @@ def test_cvss4(self): ), ], ) - @pytest.mark.vcr - def test_cpe_load(self): - """ - Test that CPE values are correctly loaded in the Flaw model. - """ - cve_id = "CVE-2020-1234" - FlawFactory(cve_id=cve_id) - - nvdc = NVDCollector() - nvdc.collect(cve_id) - - flaw = Flaw.objects.get(cve_id=cve_id) - assert hasattr(flaw, "cpe") - assert False def test_reset_flag_on_removal(self, old_flag, new_flag): """