Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 25 additions & 1 deletion packages/cli/src/opentools/scanner/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,11 @@ def kill_chain_state(self) -> KillChainState:

async def run(self) -> None:
"""Execute the full task DAG."""
from datetime import datetime, timezone

self.scan.status = ScanStatus.RUNNING
if self.scan.started_at is None:
self.scan.started_at = datetime.now(timezone.utc)
await self._schedule_loop()
self._finalize()

Expand Down Expand Up @@ -420,14 +424,34 @@ def _skip_dependents(self, failed_task_id: str) -> None:
to_skip.extend(self._dependents.get(dep_id, set()))

def _finalize(self) -> None:
"""Set final scan status based on task outcomes."""
"""Set final scan status and populate summary fields from task outcomes."""
from datetime import datetime, timezone

if self._cancellation.is_cancelled:
self.scan.status = ScanStatus.CANCELLED
elif self._completed:
self.scan.status = ScanStatus.COMPLETED
else:
self.scan.status = ScanStatus.FAILED

# Populate tools_completed / tools_failed from the task map.
# Tools are identified by task.name (one scan task per tool in the profile).
completed_tools: list[str] = []
failed_tools: list[str] = []
for task_id, task in self._tasks.items():
if task_id in self._completed:
completed_tools.append(task.name)
elif task_id in self._failed:
failed_tools.append(task.name)

# Deduplicate while preserving order
self.scan.tools_completed = list(dict.fromkeys(completed_tools))
self.scan.tools_failed = list(dict.fromkeys(failed_tools))

# Stamp completed_at if not already set
if self.scan.completed_at is None:
self.scan.completed_at = datetime.now(timezone.utc)

# ------------------------------------------------------------------
# Pipeline processing
# ------------------------------------------------------------------
Expand Down
135 changes: 135 additions & 0 deletions packages/cli/src/opentools/scanner/parsing/parsers/nikto.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
"""Nikto JSON output parser."""

from __future__ import annotations

import hashlib
import uuid
from datetime import datetime, timezone
from typing import Iterator

import orjson

from opentools.scanner.models import (
EvidenceQuality,
LocationPrecision,
RawFinding,
)


_HIGH_PATTERNS = (
"default password",
"default credential",
"directory listing",
"directory indexing",
"remote code execution",
"command injection",
)
_LOW_PATTERNS = (
"server banner",
"version disclosure",
"x-powered-by",
"server leaks",
)


def _classify_severity(msg: str) -> str:
msg_lower = msg.lower()
for pattern in _HIGH_PATTERNS:
if pattern in msg_lower:
return "high"
for pattern in _LOW_PATTERNS:
if pattern in msg_lower:
return "low"
return "medium"


class NiktoParser:
"""Parses Nikto JSON output (`-Format json`) into RawFindings.

Each vulnerability entry becomes a finding with endpoint-level precision.
Severity is heuristically assigned based on message patterns.
"""

name = "nikto"
version = "1.0.0"
confidence_tier = 0.6

def validate(self, data: bytes) -> bool:
"""Accept either a target-result dict or an array of them."""
try:
parsed = orjson.loads(data)
except (orjson.JSONDecodeError, UnicodeDecodeError):
return False
if isinstance(parsed, dict) and "vulnerabilities" in parsed:
return True
if isinstance(parsed, list):
return any(
isinstance(item, dict) and "vulnerabilities" in item
for item in parsed
)
return False

def parse(
self,
data: bytes,
scan_id: str,
scan_task_id: str,
) -> Iterator[RawFinding]:
try:
parsed = orjson.loads(data)
except orjson.JSONDecodeError:
return

# Normalize to a list of target-result dicts.
if isinstance(parsed, dict):
targets = [parsed]
elif isinstance(parsed, list):
targets = [t for t in parsed if isinstance(t, dict)]
else:
return

for target_result in targets:
yield from self._parse_target(target_result, scan_id, scan_task_id)

def _parse_target(
self,
parsed: dict,
scan_id: str,
scan_task_id: str,
) -> Iterator[RawFinding]:
host = parsed.get("host") or parsed.get("ip") or "unknown"
port = str(parsed.get("port", "") or "")

for vuln in parsed.get("vulnerabilities", []) or []:
if not isinstance(vuln, dict):
continue
msg = vuln.get("msg", "Unknown finding") or "Unknown finding"
url = vuln.get("url", "/") or "/"
method = vuln.get("method", "GET") or "GET"
nikto_id = str(vuln.get("id", "") or "")
osvdb = str(vuln.get("OSVDB", "") or "")

location_url = f"{host}:{port}{url}" if port else f"{host}{url}"
evidence_str = f"nikto:{nikto_id}:{location_url}:{msg}"
evidence_hash = hashlib.sha256(evidence_str.encode()).hexdigest()

yield RawFinding(
id=str(uuid.uuid4()),
scan_task_id=scan_task_id,
scan_id=scan_id,
tool="nikto",
raw_severity=_classify_severity(msg),
title=msg,
description=msg,
file_path=None,
url=location_url,
evidence=f"Method: {method}, Nikto ID: {nikto_id}" + (f", OSVDB: {osvdb}" if osvdb else ""),
evidence_quality=EvidenceQuality.PATTERN,
evidence_hash=evidence_hash,
cwe=None,
location_fingerprint=location_url,
location_precision=LocationPrecision.ENDPOINT,
parser_version=self.version,
parser_confidence=self.confidence_tier,
discovered_at=datetime.now(timezone.utc),
)
111 changes: 111 additions & 0 deletions packages/cli/src/opentools/scanner/parsing/parsers/nuclei.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
"""Nuclei JSONL output parser."""

from __future__ import annotations

import hashlib
import uuid
from datetime import datetime, timezone
from typing import Iterator

import orjson

from opentools.scanner.models import (
EvidenceQuality,
LocationPrecision,
RawFinding,
)


class NucleiParser:
"""Parses Nuclei JSONL output (one JSON object per line) into RawFindings.

Each matched template becomes a finding with URL-level location precision.
Severity is mapped directly from Nuclei's severity field.
"""

name = "nuclei"
version = "1.0.0"
confidence_tier = 0.8

def validate(self, data: bytes) -> bool:
"""At least one non-empty line must parse as a JSON object with 'info'."""
if not data.strip():
return False
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
obj = orjson.loads(line)
if isinstance(obj, dict) and "info" in obj:
return True
except orjson.JSONDecodeError:
continue
return False

def parse(
self,
data: bytes,
scan_id: str,
scan_task_id: str,
) -> Iterator[RawFinding]:
for line in data.splitlines():
line = line.strip()
if not line:
continue
try:
result = orjson.loads(line)
except orjson.JSONDecodeError:
continue
if not isinstance(result, dict):
continue

info = result.get("info", {}) or {}
template_id = result.get("template-id", "unknown")
severity = str(info.get("severity", "info")).lower()
matched_at = result.get("matched-at", "") or result.get("host", "")
host = result.get("host", "")

# CWE: may be a list under classification.cwe-id
cwe = None
classification = info.get("classification", {}) or {}
cwe_list = classification.get("cwe-id") or []
if isinstance(cwe_list, list) and cwe_list:
cwe = str(cwe_list[0]).upper()
if not cwe.startswith("CWE-"):
cwe = f"CWE-{cwe.lstrip('cweCWE-:')}"

title = info.get("name") or template_id
description_parts = []
if info.get("description"):
description_parts.append(str(info["description"]).strip())
if matched_at:
description_parts.append(f"Matched at: {matched_at}")
description = "\n".join(description_parts) if description_parts else None

evidence_str = f"nuclei:{template_id}:{matched_at}"
evidence_hash = hashlib.sha256(evidence_str.encode()).hexdigest()
location_fp = matched_at or host or template_id

yield RawFinding(
id=str(uuid.uuid4()),
scan_task_id=scan_task_id,
scan_id=scan_id,
tool="nuclei",
raw_severity=severity,
title=title,
description=description,
file_path=None,
line_start=None,
line_end=None,
url=matched_at or host or None,
evidence=str(result.get("matcher-name", "")) or matched_at or None,
evidence_quality=EvidenceQuality.STRUCTURED,
evidence_hash=evidence_hash,
cwe=cwe,
location_fingerprint=location_fp,
location_precision=LocationPrecision.ENDPOINT,
parser_version=self.version,
parser_confidence=self.confidence_tier,
discovered_at=datetime.now(timezone.utc),
)
76 changes: 76 additions & 0 deletions packages/cli/src/opentools/scanner/parsing/parsers/waybackurls.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
"""Waybackurls plain-text URL list parser.

Waybackurls outputs one URL per line. Each URL becomes an informational
RawFinding representing a historically-known endpoint worth probing
further in subsequent scans. Deduplicated on URL.
"""

from __future__ import annotations

import hashlib
import uuid
from datetime import datetime, timezone
from typing import Iterator

from opentools.scanner.models import (
EvidenceQuality,
LocationPrecision,
RawFinding,
)


class WaybackurlsParser:
name = "waybackurls"
version = "1.0.0"
confidence_tier = 0.4

def validate(self, data: bytes) -> bool:
"""Any non-empty input with at least one URL-shaped line."""
if not data.strip():
return False
for raw_line in data.splitlines():
line = raw_line.strip()
if line.startswith(b"http://") or line.startswith(b"https://"):
return True
return False

def parse(
self,
data: bytes,
scan_id: str,
scan_task_id: str,
) -> Iterator[RawFinding]:
seen: set[str] = set()
for raw_line in data.splitlines():
try:
url = raw_line.decode("utf-8", errors="replace").strip()
except Exception:
continue
if not url or not (url.startswith("http://") or url.startswith("https://")):
continue
if url in seen:
continue
seen.add(url)

evidence_hash = hashlib.sha256(f"waybackurls:{url}".encode()).hexdigest()

yield RawFinding(
id=str(uuid.uuid4()),
scan_task_id=scan_task_id,
scan_id=scan_id,
tool="waybackurls",
raw_severity="info",
title=f"Historical URL: {url[:80]}",
description=f"Discovered via Wayback Machine archives: {url}",
file_path=None,
url=url,
evidence=url,
evidence_quality=EvidenceQuality.HEURISTIC,
evidence_hash=evidence_hash,
cwe=None,
location_fingerprint=url,
location_precision=LocationPrecision.ENDPOINT,
parser_version=self.version,
parser_confidence=self.confidence_tier,
discovered_at=datetime.now(timezone.utc),
)
Loading
Loading