Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions vulnerabilities/importers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from vulnerabilities.pipelines.v2_importers import apache_kafka_importer as apache_kafka_importer_v2
from vulnerabilities.pipelines.v2_importers import apache_tomcat_importer as apache_tomcat_v2
from vulnerabilities.pipelines.v2_importers import archlinux_importer as archlinux_importer_v2
from vulnerabilities.pipelines.v2_importers import checkpoint_importer as checkpoint_importer_v2
from vulnerabilities.pipelines.v2_importers import collect_fix_commits as collect_fix_commits_v2
from vulnerabilities.pipelines.v2_importers import curl_importer as curl_importer_v2
from vulnerabilities.pipelines.v2_importers import debian_importer as debian_importer_v2
Expand Down Expand Up @@ -88,6 +89,7 @@
[
archlinux_importer_v2.ArchLinuxImporterPipeline,
apache_kafka_importer_v2.ApacheKafkaImporterPipeline,
checkpoint_importer_v2.CheckPointImporterPipeline,
nvd_importer_v2.NVDImporterPipeline,
elixir_security_importer_v2.ElixirSecurityImporterPipeline,
npm_importer_v2.NpmImporterPipeline,
Expand Down
213 changes: 213 additions & 0 deletions vulnerabilities/pipelines/v2_importers/checkpoint_importer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,213 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import datetime
import json
import logging
from typing import Iterable

import dateparser
import requests
from bs4 import BeautifulSoup

from vulnerabilities.importer import AdvisoryDataV2
from vulnerabilities.importer import ReferenceV2
from vulnerabilities.importer import VulnerabilitySeverity
from vulnerabilities.pipelines import VulnerableCodeBaseImporterPipelineV2
from vulnerabilities.severity_systems import GENERIC

logger = logging.getLogger(__name__)

ADVISORY_BASE_URL = "https://advisories.checkpoint.com"
ADVISORY_LIST_URL = "https://advisories.checkpoint.com/advisories/"


class CheckPointImporterPipeline(VulnerableCodeBaseImporterPipelineV2):
"""Collect Check Point security advisories."""

pipeline_id = "checkpoint_importer"
spdx_license_expression = "LicenseRef-scancode-proprietary-license"
license_url = "https://advisories.checkpoint.com/"
url = ADVISORY_LIST_URL
precedence = 200

@classmethod
def steps(cls):
return (
cls.fetch,
cls.collect_and_store_advisories,
)

def fetch(self):
self.log(f"Fetch `{self.url}`")
self.advisories_data = list(fetch_all_advisory_rows(self.log))

def advisories_count(self):
return len(self.advisories_data)

def collect_advisories(self) -> Iterable[AdvisoryDataV2]:
for row_data in self.advisories_data:
advisory = parse_advisory(row_data)
if advisory:
yield advisory


def get_available_years(soup: BeautifulSoup) -> list:
"""Return sorted list of years from year-navigation links, including current year."""
years = set()
for link in soup.find_all("a", href=True):
href = link["href"]
if "/defense/advisories/public/" in href:
part = href.rstrip("/").split("/")[-1]
if part.isdigit() and len(part) == 4:
years.add(int(part))
years.add(datetime.date.today().year)
return sorted(years)


def get_total_pages(soup: BeautifulSoup) -> int:
"""Return total page count from pagination links."""
page_nums = []
for link in soup.find_all("a", href=True):
href = link["href"]
if "/advisories/page/" in href:
part = href.split("/page/")[-1].split("?")[0].strip("/")
if part.isdigit():
page_nums.append(int(part))
return max(page_nums) if page_nums else 1


def fetch_all_advisory_rows(log_fn) -> Iterable[dict]:
"""Yield row dicts for all advisories across all years and pages."""
try:
resp = requests.get(ADVISORY_LIST_URL, timeout=30)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
log_fn(f"Failed to fetch {ADVISORY_LIST_URL}: {e}")
return

soup = BeautifulSoup(resp.text, features="lxml")
years = get_available_years(soup)

for year in years:
url = f"{ADVISORY_LIST_URL}?year={year}"
try:
resp = requests.get(url, timeout=30)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
log_fn(f"Failed to fetch {url}: {e}")
continue

year_soup = BeautifulSoup(resp.text, features="lxml")
total_pages = get_total_pages(year_soup)
yield from parse_table_rows(resp.text)

for page in range(2, total_pages + 1):
page_url = f"{ADVISORY_LIST_URL}page/{page}/?year={year}"
try:
resp = requests.get(page_url, timeout=30)
resp.raise_for_status()
except requests.exceptions.RequestException as e:
log_fn(f"Failed to fetch {page_url}: {e}")
break
yield from parse_table_rows(resp.text)


def parse_table_rows(html: str) -> list:
"""Return list of row dicts from the advisories table HTML."""
soup = BeautifulSoup(html, features="lxml")
table = soup.find("table", {"id": "cp_advisory_table_sorter"})
if not table:
return []

rows = []
for tr in table.find_all("tr")[1:]:
cells = tr.find_all("td")
if len(cells) < 7:
continue

cpai_link = cells[3].find("a")
if not cpai_link:
continue

advisory_id = cpai_link.get_text(strip=True)
href = cpai_link.get("href", "")
advisory_url = f"{ADVISORY_BASE_URL}{href}" if href.startswith("/") else href

cve_link = cells[5].find("a")
cve_text = cve_link.get_text(strip=True) if cve_link else cells[5].get_text(strip=True)
cve_id = cve_text.split(" (")[0].strip()

summary_link = cells[6].find("a")
summary = (
summary_link.get_text(strip=True) if summary_link else cells[6].get_text(strip=True)
)

rows.append(
{
"advisory_id": advisory_id,
"advisory_url": advisory_url,
"cve_id": cve_id,
"severity": cells[0].get_text(strip=True),
"date_published": cells[1].get_text(strip=True),
"summary": summary,
}
)

return rows


def parse_advisory(row_data: dict):
"""Return AdvisoryDataV2 from a row data dict, or None if advisory_id is missing."""
advisory_id = row_data.get("advisory_id") or ""
if not advisory_id or not advisory_id.startswith("CPAI-"):
return None

date_published = None
raw_date = row_data.get("date_published") or ""
if raw_date:
date_published = dateparser.parse(
raw_date,
settings={"TIMEZONE": "UTC", "RETURN_AS_TIMEZONE_AWARE": True, "TO_TIMEZONE": "UTC"},
)
if date_published is None:
logger.warning("Could not parse date %r for %s", raw_date, advisory_id)

cve_id = row_data.get("cve_id") or ""
aliases = [cve_id] if cve_id.startswith("CVE-") else []

advisory_url = row_data.get("advisory_url") or ""
references = []
if advisory_url:
references.append(ReferenceV2(url=advisory_url, reference_id=advisory_id))
if cve_id.startswith("CVE-"):
references.append(
ReferenceV2(
url=f"https://nvd.nist.gov/vuln/detail/{cve_id}",
reference_id=cve_id,
)
)

severities = []
severity = row_data.get("severity") or ""
if severity:
severities.append(VulnerabilitySeverity(system=GENERIC, value=severity))

return AdvisoryDataV2(
advisory_id=advisory_id,
aliases=aliases,
summary=row_data.get("summary") or "",
affected_packages=[],
references=references,
date_published=date_published,
weaknesses=[],
severities=severities,
url=advisory_url,
original_advisory_text=json.dumps(row_data, indent=2, ensure_ascii=False),
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#

import datetime
from pathlib import Path
from unittest import TestCase
from unittest.mock import MagicMock
from unittest.mock import patch

import requests
from bs4 import BeautifulSoup

from vulnerabilities.pipelines.v2_importers.checkpoint_importer import CheckPointImporterPipeline
from vulnerabilities.pipelines.v2_importers.checkpoint_importer import get_available_years
from vulnerabilities.pipelines.v2_importers.checkpoint_importer import get_total_pages
from vulnerabilities.pipelines.v2_importers.checkpoint_importer import parse_advisory
from vulnerabilities.pipelines.v2_importers.checkpoint_importer import parse_table_rows
from vulnerabilities.tests import util_tests

TEST_DATA = Path(__file__).parent.parent.parent / "test_data" / "checkpoint"

with open(TEST_DATA / "advisories_2026.html") as f:
SAMPLE_HTML = f.read()

SAMPLE_ROWS = parse_table_rows(SAMPLE_HTML)


class TestGetAvailableYears(TestCase):
def test_extracts_years_from_nav_links(self):
soup = BeautifulSoup(SAMPLE_HTML, features="lxml")
years = get_available_years(soup)
current_year = datetime.date.today().year
assert 2024 in years
assert 2025 in years
assert current_year in years

def test_always_includes_current_year(self):
soup = BeautifulSoup("<html></html>", features="lxml")
years = get_available_years(soup)
assert years == [datetime.date.today().year]


class TestGetTotalPages(TestCase):
def test_extracts_max_page_from_pagination(self):
soup = BeautifulSoup(SAMPLE_HTML, features="lxml")
assert get_total_pages(soup) == 2

def test_returns_one_when_no_pagination(self):
soup = BeautifulSoup("<html></html>", features="lxml")
assert get_total_pages(soup) == 1


class TestParseTableRows(TestCase):
def test_parses_three_rows(self):
assert len(SAMPLE_ROWS) == 3

def test_first_row_advisory_id(self):
assert SAMPLE_ROWS[0]["advisory_id"] == "CPAI-2026-1780"

def test_first_row_cve_id(self):
assert SAMPLE_ROWS[0]["cve_id"] == "CVE-2026-20122"

def test_first_row_severity(self):
assert SAMPLE_ROWS[0]["severity"] == "Medium"

def test_first_row_date(self):
assert SAMPLE_ROWS[0]["date_published"] == "17 Mar 2026"

def test_first_row_summary(self):
assert "Cisco Catalyst" in SAMPLE_ROWS[0]["summary"]

def test_first_row_advisory_url(self):
assert SAMPLE_ROWS[0]["advisory_url"].endswith("cpai-2026-1780.html")

def test_cve_id_stripped_of_extra_text(self):
assert SAMPLE_ROWS[2]["cve_id"] == "CVE-2025-33603"

def test_returns_empty_list_for_missing_table(self):
assert parse_table_rows("<html></html>") == []


def test_parse_advisories():
results = []
for row in SAMPLE_ROWS:
advisory = parse_advisory(row)
if advisory:
results.append(advisory.to_dict())
expected_file = TEST_DATA / "advisories_2026-expected.json"
util_tests.check_results_against_json(results, expected_file)


def test_missing_id_returns_none():
assert parse_advisory({}) is None
assert parse_advisory({"advisory_id": ""}) is None
assert parse_advisory({"advisory_id": "INVALID-123"}) is None


class TestCheckPointImporterPipeline(TestCase):
def setUp(self):
self.pipeline = CheckPointImporterPipeline()
self.pipeline.advisories_data = SAMPLE_ROWS

def test_advisories_count(self):
assert self.pipeline.advisories_count() == 3

def test_collect_advisories_yields_all_valid(self):
advisories = list(self.pipeline.collect_advisories())
assert len(advisories) == 3

@patch("vulnerabilities.pipelines.v2_importers.checkpoint_importer.requests.get")
def test_fetch_stores_advisory_rows(self, mock_get):
mock_resp = MagicMock()
mock_resp.text = SAMPLE_HTML
mock_get.return_value = mock_resp
self.pipeline.fetch()
assert len(self.pipeline.advisories_data) > 0

@patch("vulnerabilities.pipelines.v2_importers.checkpoint_importer.requests.get")
def test_fetch_handles_request_error(self, mock_get):
mock_get.side_effect = requests.exceptions.RequestException("timeout")
self.pipeline.fetch()
assert self.pipeline.advisories_data == []
Loading