Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
from vulnerabilities.pipelines.v2_importers import nginx_importer as nginx_importer_v2
from vulnerabilities.pipelines.v2_importers import npm_importer as npm_importer_v2
from vulnerabilities.pipelines.v2_importers import nvd_importer as nvd_importer_v2
from vulnerabilities.pipelines.v2_importers import openssl_importer as openssl_importer_v2
from vulnerabilities.pipelines.v2_importers import oss_fuzz as oss_fuzz_v2
from vulnerabilities.pipelines.v2_importers import postgresql_importer as postgresql_importer_v2
from vulnerabilities.pipelines.v2_importers import (
Expand Down Expand Up @@ -119,6 +120,7 @@
ruby.RubyImporter,
apache_kafka.ApacheKafkaImporter,
openssl.OpensslImporter,
openssl_importer_v2.OpenSSLImporterPipeline,
redhat.RedhatImporter,
archlinux.ArchlinuxImporter,
ubuntu.UbuntuImporter,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Generated by Django 4.2.25 on 2026-01-19 06:22

from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0110_pipelineschedule_is_run_once"),
]

operations = [
migrations.AlterField(
model_name="advisoryseverity",
name="scoring_system",
field=models.CharField(
choices=[
("cvssv2", "CVSSv2 Base Score"),
("cvssv3", "CVSSv3 Base Score"),
("cvssv3.1", "CVSSv3.1 Base Score"),
("cvssv4", "CVSSv4 Base Score"),
("rhbs", "RedHat Bugzilla severity"),
("rhas", "RedHat Aggregate severity"),
("archlinux", "Archlinux Vulnerability Group Severity"),
("cvssv3.1_qr", "CVSSv3.1 Qualitative Severity Rating"),
("generic_textual", "Generic textual severity rating"),
("apache_httpd", "Apache Httpd Severity"),
("apache_tomcat", "Apache Tomcat Severity"),
("epss", "Exploit Prediction Scoring System"),
("ssvc", "Stakeholder-Specific Vulnerability Categorization"),
("openssl", "OpenSSL Severity"),
],
help_text="Identifier for the scoring system used. Available choices are: cvssv2: CVSSv2 Base Score,\ncvssv3: CVSSv3 Base Score,\ncvssv3.1: CVSSv3.1 Base Score,\ncvssv4: CVSSv4 Base Score,\nrhbs: RedHat Bugzilla severity,\nrhas: RedHat Aggregate severity,\narchlinux: Archlinux Vulnerability Group Severity,\ncvssv3.1_qr: CVSSv3.1 Qualitative Severity Rating,\ngeneric_textual: Generic textual severity rating,\napache_httpd: Apache Httpd Severity,\napache_tomcat: Apache Tomcat Severity,\nepss: Exploit Prediction Scoring System,\nssvc: Stakeholder-Specific Vulnerability Categorization,\nopenssl: OpenSSL Severity ",
max_length=50,
),
),
migrations.AlterField(
model_name="vulnerabilityseverity",
name="scoring_system",
field=models.CharField(
choices=[
("cvssv2", "CVSSv2 Base Score"),
("cvssv3", "CVSSv3 Base Score"),
("cvssv3.1", "CVSSv3.1 Base Score"),
("cvssv4", "CVSSv4 Base Score"),
("rhbs", "RedHat Bugzilla severity"),
("rhas", "RedHat Aggregate severity"),
("archlinux", "Archlinux Vulnerability Group Severity"),
("cvssv3.1_qr", "CVSSv3.1 Qualitative Severity Rating"),
("generic_textual", "Generic textual severity rating"),
("apache_httpd", "Apache Httpd Severity"),
("apache_tomcat", "Apache Tomcat Severity"),
("epss", "Exploit Prediction Scoring System"),
("ssvc", "Stakeholder-Specific Vulnerability Categorization"),
("openssl", "OpenSSL Severity"),
],
help_text="Identifier for the scoring system used. Available choices are: cvssv2: CVSSv2 Base Score,\ncvssv3: CVSSv3 Base Score,\ncvssv3.1: CVSSv3.1 Base Score,\ncvssv4: CVSSv4 Base Score,\nrhbs: RedHat Bugzilla severity,\nrhas: RedHat Aggregate severity,\narchlinux: Archlinux Vulnerability Group Severity,\ncvssv3.1_qr: CVSSv3.1 Qualitative Severity Rating,\ngeneric_textual: Generic textual severity rating,\napache_httpd: Apache Httpd Severity,\napache_tomcat: Apache Tomcat Severity,\nepss: Exploit Prediction Scoring System,\nssvc: Stakeholder-Specific Vulnerability Categorization,\nopenssl: OpenSSL Severity ",
max_length=50,
),
),
]
181 changes: 181 additions & 0 deletions vulnerabilities/pipelines/v2_importers/openssl_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import logging
from pathlib import Path
from traceback import format_exc as traceback_format_exc
from typing import Iterable

from dateutil.parser import parse
from fetchcode.vcs import fetch_via_vcs
from packageurl import PackageURL
from univers.version_range import OpensslVersionRange

from vulnerabilities import severity_systems
from vulnerabilities.importer import AdvisoryData
from vulnerabilities.importer import AffectedPackageV2
from vulnerabilities.importer import PatchData
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.pipes import openssl
from vulnerabilities.utils import build_description
from vulnerabilities.utils import create_weaknesses_list
from vulnerabilities.utils import get_item
from vulnerabilities.utils import load_json


class OpenSSLImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Import OpenSSL Advisories"""

pipeline_id = "openssl_importer_v2"
spdx_license_expression = "Apache-2.0"
importer_name = "OpenSSL Importer V2"

license_url = "https://github.com/openssl/openssl/blob/master/LICENSE.txt"
repo_url = "git+https://github.com/openssl/release-metadata/"

@classmethod
def steps(cls):
return (
cls.clone,
cls.collect_and_store_advisories,
cls.clean_downloads,
)

def clone(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)
self.advisory_path = Path(self.vcs_response.dest_dir)

def advisories_count(self):
vuln_directory = self.advisory_path / "secjson"
return sum(1 for _ in vuln_directory.glob("CVE-*.json"))

def collect_advisories(self) -> Iterable[AdvisoryData]:
vuln_directory = self.advisory_path / "secjson"

for advisory in vuln_directory.glob("CVE-*.json"):
yield self.to_advisory_data(advisory)

def to_advisory_data(self, file: Path) -> Iterable[AdvisoryData]:
# TODO: Collect the advisory credits, see https://github.com/aboutcode-org/vulnerablecode/issues/2121

affected_packages = []
severities = []
references = []
patches = []
fix_commits = {}
cwe_string = None

data = load_json(file)
advisory_text = file.read_text()
advisory = get_item(data, "containers", "cna")
description = get_item(advisory, "descriptions", 0, "value")
title = advisory.get("title")
date_published = parse(get_item(advisory, "datePublic"))
cve = get_item(data, "cveMetadata", "cveId")
severity_score = get_item(advisory, "metrics", 0, "other", "content", "text")

for reference in get_item(advisory, "references") or []:
ref_name = reference.get("name")
ref_url = reference.get("url")
if not ref_url:
continue

tag = get_item(reference, "tags", 0) or ""
tag = tag.lower()
references.append(openssl.get_reference(ref_name, tag, ref_url))

if tag != "patch":
continue

if not ref_name:
patches.append(PatchData(patch_url=ref_url))
continue

fix_commits[ref_name.split()[0]] = ref_url

for affected in get_item(advisory, "affected", 0, "versions") or []:
if affected.get("status") != "affected":
continue
fixed_by_commit_patches = []
affected_constraints = None
fixed_version = None

try:
affected_constraints, fixed_version = openssl.parse_affected_fixed(affected)
except Exception as e:
self.log(
f"Failed to parse OpenSSL version for: {cve} with error {e!r}:\n{traceback_format_exc()}",
level=logging.ERROR,
)
continue

fixed_version_range = (
OpensslVersionRange.from_versions([fixed_version]) if fixed_version else None
)

affected_version_range = (
OpensslVersionRange(constraints=affected_constraints)
if affected_constraints
else None
)

if fixed_version and (commit_url := fix_commits.get(fixed_version)):
if patch := openssl.get_commit_patch(
url=commit_url,
logger=self.log,
):
fixed_by_commit_patches.append(patch)

affected_packages.append(
AffectedPackageV2(
package=PackageURL(type="openssl", name="openssl"),
affected_version_range=affected_version_range,
fixed_version_range=fixed_version_range,
fixed_by_commit_patches=fixed_by_commit_patches,
)
)

if severity_score:
severities.append(
VulnerabilitySeverity(
system=severity_systems.OPENSSL,
value=severity_score,
url=f"https://openssl-library.org/news/secjson/{cve.lower()}.json",
)
)

if "problemTypes" in advisory:
problem_type = get_item(advisory, "problemTypes", 0, "descriptions", 0)
cwe_string = problem_type.get("cweId")

weaknesses = create_weaknesses_list([cwe_string]) if cwe_string else []

return AdvisoryData(
advisory_id=cve,
aliases=[],
summary=build_description(summary=title, description=description),
date_published=date_published,
affected_packages=affected_packages,
references_v2=references,
severities=severities,
weaknesses=weaknesses,
patches=patches,
url=f"https://github.com/openssl/release-metadata/blob/main/secjson/{cve}.json",
original_advisory_text=advisory_text,
)

def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()
101 changes: 101 additions & 0 deletions vulnerabilities/pipes/openssl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

from urllib.parse import parse_qs
from urllib.parse import urlparse

from univers.version_constraint import VersionConstraint
from univers.versions import OpensslVersion

from vulnerabilities.importer import PackageCommitPatchData
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.models import AdvisoryReference
from vulnerabilities.utils import get_item


def parse_affected_fixed(affected):
impact_lower = affected.get("version")
affected_constraint = []
fixed_version = None

if not impact_lower:
return affected_constraint, fixed_version

if impact_upper := affected.get("lessThan"):
fixed_version = impact_upper
affected_constraint.append(
VersionConstraint(
comparator="<",
version=OpensslVersion(string=impact_upper),
)
)
elif impact_upper := affected.get("lessThanOrEqual"):
affected_constraint.append(
VersionConstraint(
comparator="<=",
version=OpensslVersion(string=impact_upper),
)
)

lower_comp = "=" if not affected_constraint else ">="
affected_constraint.append(
VersionConstraint(
comparator=lower_comp,
version=OpensslVersion(string=impact_lower),
)
)

return affected_constraint, fixed_version


def get_commit_patch(url, logger):
"""Return PackageCommitPatchData from OpenSSL commit url."""

vcs_url = "https://github.com/openssl/openssl/"
hash = None

if url.startswith("https://github.openssl.org/"):
# unknow vcs url, these are instead stored as references.
return

if url.startswith("https://github.com/"):
vcs_url, hash = url.split("/commit/")
elif url.startswith("https://git.openssl.org/"):
parsed = urlparse(url)
params = parse_qs(parsed.query, separator=";")
if "h" in params:
# git.openssl.org has moved to github.com/openssl/openssl
hash = get_item(params, "h", 0)

if not hash:
logger(f"Unsupported commit url {url}")
return

return PackageCommitPatchData(
vcs_url=vcs_url,
commit_hash=hash[:40],
)


def get_reference(reference_name, tag, reference_url):
name = reference_name.lower() if reference_name else ""

ref_type = (
AdvisoryReference.COMMIT
if "commit" in name or tag == "patch"
else AdvisoryReference.ADVISORY
if "advisory" in name
else AdvisoryReference.OTHER
)

return ReferenceV2(
reference_id=reference_name or tag,
reference_type=ref_type,
url=reference_url,
)
14 changes: 13 additions & 1 deletion vulnerabilities/severity_systems.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
#

import dataclasses
from datetime import datetime

from cvss import CVSS2
from cvss import CVSS3
Expand Down Expand Up @@ -185,6 +184,18 @@ def get(self, scoring_elements: str) -> dict:
"Low",
]

OPENSSL = ScoringSystem(
identifier="openssl",
name="OpenSSL Severity",
url="https://openssl-library.org/policies/general/security-policy/",
)
OPENSSL.choices = [
"Critical",
"High",
"Moderate",
"Low",
]


@dataclasses.dataclass(order=True)
class EPSSScoringSystem(ScoringSystem):
Expand Down Expand Up @@ -227,5 +238,6 @@ def get(self, scoring_elements: str):
APACHE_TOMCAT,
EPSS,
SSVC,
OPENSSL,
)
}
Loading