Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions vulnerabilities/api_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from vulnerabilities.models import AdvisoryWeakness
from vulnerabilities.models import CodeFix
from vulnerabilities.models import CodeFixV2
from vulnerabilities.models import DetectionRule
from vulnerabilities.models import ImpactedPackage
from vulnerabilities.models import Package
from vulnerabilities.models import PackageV2
Expand Down Expand Up @@ -1398,3 +1399,29 @@ def lookup(self, request):

qs = self.get_queryset().for_purls([purl]).with_is_vulnerable()
return Response(PackageV3Serializer(qs, many=True, context={"request": request}).data)


class DetectionRuleFilter(filters.FilterSet):
advisory_avid = filters.CharFilter(field_name="advisory__avid", lookup_expr="exact")

rule_text_contains = filters.CharFilter(field_name="rule_text", lookup_expr="icontains")

class Meta:
model = DetectionRule
fields = ["rule_type"]


class DetectionRuleSerializer(serializers.ModelSerializer):
advisory_avid = serializers.ReadOnlyField(source="advisory.avid", allow_null=True)

class Meta:
model = DetectionRule
fields = ["rule_type", "source_url", "rule_metadata", "rule_text", "advisory_avid"]


class DetectionRuleViewSet(viewsets.ReadOnlyModelViewSet):
queryset = DetectionRule.objects.all()
serializer_class = DetectionRuleSerializer
throttle_classes = [AnonRateThrottle, PermissionBasedUserRateThrottle]
filter_backends = [filters.DjangoFilterBackend]
filterset_class = DetectionRuleFilter
30 changes: 30 additions & 0 deletions vulnerabilities/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from django_altcha import AltchaField

from vulnerabilities.models import ApiUser
from vulnerabilities.models import DetectionRuleTypes


class PackageSearchForm(forms.Form):
Expand Down Expand Up @@ -43,6 +44,35 @@ class AdvisorySearchForm(forms.Form):
)


class DetectionRuleSearchForm(forms.Form):
rule_type = forms.ChoiceField(
required=False,
label="Rule Type",
choices=[("", "All")] + DetectionRuleTypes.choices,
initial="",
)

advisory_avid = forms.CharField(
required=False,
label="Advisory avid",
widget=forms.TextInput(
attrs={
"placeholder": "Search by avid: github_osv_importer_v2/GHSA-7g5f-wrx8-5ccf",
}
),
)

rule_text_contains = forms.CharField(
required=False,
label="Rule Text",
widget=forms.TextInput(
attrs={
"placeholder": "Search in rule text",
}
),
)


class ApiUserCreationForm(forms.ModelForm):
"""Support a simplified creation for API-only users directly from the UI."""

Expand Down
2 changes: 2 additions & 0 deletions vulnerabilities/improvers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
)
from vulnerabilities.pipelines.v2_improvers import flag_ghost_packages as flag_ghost_packages_v2
from vulnerabilities.pipelines.v2_improvers import relate_severities
from vulnerabilities.pipelines.v2_improvers import sigma_rules
from vulnerabilities.pipelines.v2_improvers import unfurl_version_range as unfurl_version_range_v2
from vulnerabilities.utils import create_registry

Expand Down Expand Up @@ -74,5 +75,6 @@
compute_advisory_todo.ComputeToDo,
collect_ssvc_trees.CollectSSVCPipeline,
relate_severities.RelateSeveritiesPipeline,
sigma_rules.SigmaRulesImproverPipeline,
]
)
68 changes: 68 additions & 0 deletions vulnerabilities/migrations/0116_detectionrule.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Generated by Django 5.2.11 on 2026-03-07 14:44

import django.db.models.deletion
from django.db import migrations, models


class Migration(migrations.Migration):

dependencies = [
("vulnerabilities", "0115_impactedpackageaffecting_and_more"),
]

operations = [
migrations.CreateModel(
name="DetectionRule",
fields=[
(
"id",
models.AutoField(
auto_created=True, primary_key=True, serialize=False, verbose_name="ID"
),
),
(
"rule_type",
models.CharField(
choices=[
("yara", "Yara"),
("yara-x", "Yara-X"),
("sigma", "Sigma"),
("clamav", "ClamAV"),
("suricata", "Suricata"),
],
help_text="The type of the detection rule content (e.g., YARA, Sigma).",
max_length=50,
),
),
(
"source_url",
models.URLField(
help_text="URL to the original source or reference for this rule.",
max_length=1024,
),
),
(
"rule_metadata",
models.JSONField(
blank=True,
help_text="Additional structured data such as tags, or author information.",
null=True,
),
),
(
"rule_text",
models.TextField(help_text="The content of the detection signature."),
),
(
"advisory",
models.ForeignKey(
blank=True,
null=True,
on_delete=django.db.models.deletion.SET_NULL,
related_name="detection_rules",
to="vulnerabilities.advisoryv2",
),
),
],
),
]
42 changes: 42 additions & 0 deletions vulnerabilities/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3649,3 +3649,45 @@ def __str__(self):

class Meta:
unique_together = ("vector", "source_advisory")


class DetectionRuleTypes(models.TextChoices):
"""Defines the supported formats for security detection rules."""

YARA = "yara", "Yara"
YARA_X = "yara-x", "Yara-X"
SIGMA = "sigma", "Sigma"
CLAMAV = "clamav", "ClamAV"
SURICATA = "suricata", "Suricata"


class DetectionRule(models.Model):
"""
A Detection Rule is code used to identify malicious activity or security threats.
"""

rule_type = models.CharField(
max_length=50,
choices=DetectionRuleTypes.choices,
help_text="The type of the detection rule content (e.g., YARA, Sigma).",
)

source_url = models.URLField(
max_length=1024, help_text="URL to the original source or reference for this rule."
)

rule_metadata = models.JSONField(
null=True,
blank=True,
help_text="Additional structured data such as tags, or author information.",
)

rule_text = models.TextField(help_text="The content of the detection signature.")

advisory = models.ForeignKey(
AdvisoryV2,
related_name="detection_rules",
on_delete=models.SET_NULL,
null=True,
blank=True,
)
126 changes: 126 additions & 0 deletions vulnerabilities/pipelines/v2_improvers/sigma_rules.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# VulnerableCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/aboutcode-org/vulnerablecode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import datetime
from pathlib import Path

import yaml
from aboutcode.pipeline import LoopProgress
from fetchcode.vcs import fetch_via_vcs

from vulnerabilities.models import AdvisoryAlias
from vulnerabilities.models import DetectionRule
from vulnerabilities.models import DetectionRuleTypes
from vulnerabilities.pipelines import VulnerableCodePipeline
from vulnerabilities.utils import find_all_cve


class SigmaRulesImproverPipeline(VulnerableCodePipeline):
pipeline_id = "sigma_rules"
repo_url = "git+https://github.com/SigmaHQ/sigma"
license_url = "https://github.com/SigmaHQ/Detection-Rule-License"

@classmethod
def steps(cls):
return (
cls.clone_repo,
cls.collect_and_store_rules,
cls.clean_downloads,
)

def clone_repo(self):
self.log(f"Cloning `{self.repo_url}`")
self.vcs_response = fetch_via_vcs(self.repo_url)

def collect_and_store_rules(self):
"""
Collect Sigma YAML rules from the destination directory and store/update
them as DetectionRule objects.
"""

base_directory = Path(self.vcs_response.dest_dir)
yaml_files = [
p
for p in base_directory.rglob("**/*.yml")
if not any(part in [".github", "images", "documentation"] for part in p.parts)
]

rules_count = len(yaml_files)
self.log(f"Enhancing the vulnerability with {rules_count:,d} rule records")
progress = LoopProgress(total_iterations=rules_count, logger=self.log)
for file_path in progress.iter(yaml_files):
raw_text = file_path.read_text(encoding="utf-8")
rule_documents = list(yaml.load_all(raw_text, yaml.FullLoader))

rule_metadata = extract_sigma_metadata(rule_documents)
rule_url = f"https://raw.githubusercontent.com/SigmaHQ/sigma/refs/heads/master/{file_path.relative_to(base_directory)}"
cve_ids = find_all_cve(str(file_path))

found_advisories = set()
for cve_id in cve_ids:
try:
alias = AdvisoryAlias.objects.get(alias=cve_id)
for adv in alias.advisories.all():
found_advisories.add(adv)
except AdvisoryAlias.DoesNotExist:
self.log(f"AdvisoryAlias {cve_id}: {file_path.name} not found.")
continue

for adv in found_advisories:
DetectionRule.objects.update_or_create(
rule_text=raw_text,
rule_type=DetectionRuleTypes.SIGMA,
defaults={
"rule_metadata": rule_metadata,
"source_url": rule_url,
"advisory": adv,
},
)

if not found_advisories:
DetectionRule.objects.update_or_create(
rule_text=raw_text,
rule_type=DetectionRuleTypes.SIGMA,
advisory=None,
defaults={
"rule_metadata": rule_metadata,
"source_url": rule_url,
},
)

def clean_downloads(self):
if self.vcs_response:
self.log(f"Removing cloned repository")
self.vcs_response.delete()

def on_failure(self):
self.clean_downloads()


def extract_sigma_metadata(rule_documents):
"""
Extract Sigma metadata from Sigma YAML rules
"""
if not rule_documents:
return None

first_document = rule_documents[0]
metadata = {
"status": first_document.get("status"),
"author": first_document.get("author"),
"date": first_document.get("date"),
"title": first_document.get("title"),
"id": first_document.get("id"),
}

rule_date = metadata.get("date")

if isinstance(rule_date, (datetime.date, datetime.datetime)):
metadata["date"] = rule_date.isoformat()

return metadata
Loading