Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ collect_strings_gettext = "scanpipe.pipelines.collect_strings_gettext:CollectStr
collect_symbols_ctags = "scanpipe.pipelines.collect_symbols_ctags:CollectSymbolsCtags"
collect_symbols_pygments = "scanpipe.pipelines.collect_symbols_pygments:CollectSymbolsPygments"
collect_symbols_tree_sitter = "scanpipe.pipelines.collect_symbols_tree_sitter:CollectSymbolsTreeSitter"
collect_symbols_patches = "scanpipe.pipelines.collect_patch_symbols:CollectPatchSymbols"
enrich_with_purldb = "scanpipe.pipelines.enrich_with_purldb:EnrichWithPurlDB"
fetch_scores = "scanpipe.pipelines.fetch_scores:FetchScores"
find_vulnerabilities = "scanpipe.pipelines.find_vulnerabilities:FindVulnerabilities"
Expand Down
30 changes: 30 additions & 0 deletions scanpipe/pipelines/collect_patch_symbols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from scanpipe.pipelines import Pipeline
from scanpipe.pipes import symbols


class CollectPatchSymbols(Pipeline):
"""Collect Patch symbols using (ctags, pygments, tree_sitter)"""

download_inputs = False
is_addon = True
results_url = "/project/{slug}/resources/?extra_data=patch_symbols"

@classmethod
def steps(cls):
return (cls.collect_and_store_patch_symbols_and_strings,)

def collect_and_store_patch_symbols_and_strings(self):
"""
Pipeline(s) that can retrieve vulnerable/fixed symbols, collect local symbols (pur2sym) and match them
"""
symbol_type = "tree_sitter"
symbols.collect_and_store_patch_symbols(self.project, symbol_type, self.log)
233 changes: 233 additions & 0 deletions scanpipe/pipes/symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,23 @@
# ScanCode.io is a free software code scanning tool from nexB Inc. and others.
# Visit https://github.com/aboutcode-org/scancode.io for support and download.

import os
import tempfile

from django.db.models import Q

from source_inspector import symbols_ctags
from source_inspector import symbols_pygments
from source_inspector import symbols_tree_sitter

from aboutcode.pipeline import LoopProgress
from scanpipe.pipes.fetch import fetch_http
from scanpipe.pipes.pathmap import build_index
from scanpipe.pipes.pathmap import find_paths
from scanpipe.pipes.symbolmap import MATCHING_RATIO_JAVASCRIPT
from scanpipe.pipes.symbolmap import MATCHING_RATIO_JAVASCRIPT_SMALL_FILE
from scanpipe.pipes.symbolmap import SMALL_FILE_SYMBOLS_THRESHOLD_JAVASCRIPT
from scanpipe.pipes.symbolmap import get_similarity_between_source_and_deployed_symbols


class UniversalCtagsNotFound(Exception):
Expand Down Expand Up @@ -171,3 +185,222 @@ def _collect_and_store_tree_sitter_symbols_and_strings(resource):
"source_strings": result.get("source_strings"),
}
)


SYMBOLS_TYPE_SUPPORTED = {
"ctags": symbols_ctags.get_symbols,
"tree_sitter": symbols_tree_sitter.get_treesitter_symbols,
"pygments": symbols_pygments.get_pygments_symbols,
}

DOC_EXTENSIONS = {
".md",
".rst",
".txt",
".html",
".pdf",
".wiki",
".json",
".yaml",
".yml",
".toml",
}


def get_vulnerability_patch_text(vuln):
# TODO this is a mock, we should delete this function once we migrate to v2 api vulnerablecode
# https://files.pythonhosted.org/packages/99/ab/eedb921f26adf7057ade1291f9c1bfa35a506d64894f58546457ef658772/Flask-1.0.tar.gz

patch_urls = [
# VCID-z6fe-2j8a-aaak
# "https://github.com/pallets/flask/commit/70f906c51ce49c485f1d355703e9cc3386b1cc2b.patch",
# "https://github.com/pallets/flask/commit/afd63b16170b7c047f5758eb910c416511e9c965.patch",
# VCID-e8hf-2zj4-1qhv
"https://github.com/pallets/flask/commit/089cb86dd22bff589a4eafb7ab8e42dc357623b4.patch"
]

for patch_url in patch_urls:
file_path = fetch_http(patch_url).path
with open(file_path) as f:
patch_text = f.read()
yield patch_text


def parse_patch_symbols(raw_code: str, path: str, symbols_type="tree_sitter") -> dict:
if not raw_code or not raw_code.strip():
return {}

_, file_suffix = os.path.splitext(path)

with tempfile.NamedTemporaryFile(mode="w+", suffix=file_suffix, delete=False) as f:
f.write(raw_code)
f.flush()
temp_name = f.name

try:
parser_func = SYMBOLS_TYPE_SUPPORTED.get(symbols_type, lambda f: {})
return parser_func(temp_name) or {}
finally:
os.remove(temp_name)


def get_patch_symbols(vulnerable_files: dict, fixed_files: dict, symbol_type) -> dict:
symbols_results = {}
all_file_paths = set(vulnerable_files.keys()) | set(fixed_files.keys())

for file_path in all_file_paths:
vuln_code = vulnerable_files.get(file_path, "")
fixed_code = fixed_files.get(file_path, "")
vuln_parsed = parse_patch_symbols(vuln_code, file_path, symbol_type)
fixed_parsed = parse_patch_symbols(fixed_code, file_path, symbol_type)

symbols_results[file_path] = {
"vulnerable_symbols": vuln_parsed.get("source_symbols", []),
"vulnerable_strings": vuln_parsed.get("source_strings", []),
"fixed_symbols": fixed_parsed.get("source_symbols", []),
"fixed_strings": fixed_parsed.get("source_strings", []),
}
return symbols_results


def _should_skip(file_path: str):
file_name = os.path.basename(file_path)
_, ext = os.path.splitext(file_name)

if ext.lower() in DOC_EXTENSIONS:
return True

lower_name = file_name.lower()
if (
lower_name.startswith("test_")
or lower_name.startswith("test")
or "_test." in lower_name
):
return True

lower_path = file_path.lower()
if "test/" in lower_path or "tests/" in lower_path or "/testdata/" in lower_path:
return True

return False


def extract_patch_details(patch_text: str):
from unidiff import PatchSet

patch = PatchSet(patch_text)
vulnerable_files = {}
fixed_files = {}

for patched_file in patch:
if _should_skip(patched_file.path):
continue

vuln_lines = []
fixed_lines = []
for hunk in patched_file:
for line in hunk:
if line.is_removed:
vuln_lines.append(line.value)
elif line.is_added:
fixed_lines.append(line.value)

if vuln_lines:
vulnerable_files[patched_file.path] = "".join(vuln_lines)
if fixed_lines:
fixed_files[patched_file.path] = "".join(fixed_lines)

return vulnerable_files, fixed_files


def collect_and_store_patch_symbols(project, symbol_type, logger=None):
packages = project.discoveredpackages.all()
packages_count = packages.count()

if logger:
logger(
f"Collecting patch symbols for {packages_count:,d} discovered packages "
"and computing reachability."
)

progress = LoopProgress(packages_count, logger)
for package in progress.iter(packages.iterator(chunk_size=2000)):
try:
_collect_and_store_patch_symbols(project, package, symbol_type)
except Exception as e:
project.add_error(
description=f"Cannot collect patch symbols for package {package.name}",
exception=e,
model="collect_and_store_patch_symbols",
details={"package_uuid": str(package.uuid)},
)


def calculate_reachability(source_symbols, vulnerable_symbols, fixed_symbols):
is_vulnerable, vulnerable_similarity = (
get_similarity_between_source_and_deployed_symbols(
source_symbols=source_symbols,
deployed_symbols=vulnerable_symbols,
matching_ratio=MATCHING_RATIO_JAVASCRIPT,
matching_ratio_small_file=MATCHING_RATIO_JAVASCRIPT_SMALL_FILE,
small_file_threshold=SMALL_FILE_SYMBOLS_THRESHOLD_JAVASCRIPT,
)
)

is_fixed, fixed_similarity = get_similarity_between_source_and_deployed_symbols(
source_symbols=source_symbols,
deployed_symbols=fixed_symbols,
matching_ratio=MATCHING_RATIO_JAVASCRIPT,
matching_ratio_small_file=MATCHING_RATIO_JAVASCRIPT_SMALL_FILE,
small_file_threshold=SMALL_FILE_SYMBOLS_THRESHOLD_JAVASCRIPT,
)

return {
"is_vulnerable_matched": is_vulnerable,
"vulnerable_similarity": vulnerable_similarity,
"is_fixed_matched": is_fixed,
"fixed_similarity": fixed_similarity,
"is_reachable": vulnerable_similarity >= fixed_similarity,
}


def _collect_and_store_patch_symbols(project, package, symbol_type):
vulnerabilities = package.affected_by_vulnerabilities

resource_data = project.codebaseresources.values_list("id", "path")
path_index = build_index(resource_data, with_subpaths=True)

for vuln in vulnerabilities:
# TODO fix this to after done with vulnerablecode migration to advisories and merge patch API
for patch_text in get_vulnerability_patch_text(vuln):
if not patch_text or not patch_text.strip():
continue

vulnerable_files, fixed_files = extract_patch_details(patch_text)
patch_symbols_data = get_patch_symbols(
vulnerable_files, fixed_files, symbol_type
)
for file_path, patch_symbols in patch_symbols_data.items():
match = find_paths(file_path, path_index)
matched_resources = project.codebaseresources.filter(
id__in=match.resource_ids
)
if not matched_resources:
print(f"Failed to get the code base resources: {file_path}")
continue

for resource in matched_resources:
resource_symbols = resource.extra_data.get("source_symbols", [])
vulnerable_symbols = patch_symbols.get("vulnerable_symbols", [])
fixed_symbols = patch_symbols.get("fixed_symbols", [])

reachability_percentage = calculate_reachability(
resource_symbols, vulnerable_symbols, fixed_symbols
)
resource.update_extra_data(
{
"vulnerable_symbols": vulnerable_symbols,
"fixed_symbols": fixed_symbols,
"reachability": reachability_percentage,
}
)
Empty file.
Loading