Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions benchmarks/policy_override_attribution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
"""Per-rule attribution: which of the 8 policy_overrides actually fire,
on which queries, and do they change the outcome vs. lex-only baseline?

For every corpus query, run BOTH lex_only and both configs, compare
the top result. When they differ, report which policy_override rule
made the difference.
"""
import json
import shutil
import sys
from pathlib import Path
from collections import defaultdict

import microresolve

PACK_NAME = "eu-ai-act-prohibited"
PACK_SRC = Path("packs") / PACK_NAME
CORPUS = Path("_internal/EU_AI_ACT_EVAL_CORPUS.json")
TARGET_THRESHOLD = 1.5


def stage(config: str, root: Path) -> Path:
cfg_root = root / config
if cfg_root.exists():
shutil.rmtree(cfg_root)
cfg_root.mkdir(parents=True)
dest = cfg_root / PACK_NAME
shutil.copytree(PACK_SRC, dest)
ns_path = dest / "_ns.json"
ns = json.load(open(ns_path))
if config == "lex_only":
ns.pop("policy_overrides", None)
json.dump(ns, open(ns_path, "w"), indent=2)
return cfg_root


def top_intent_at_threshold(ns, query):
r = ns.resolve(query)
return next((i.id for i in r.intents if i.score >= TARGET_THRESHOLD), None)


def main():
corpus = json.load(open(CORPUS))
root = Path("/tmp/policy_attribution")
if root.exists():
shutil.rmtree(root)
root.mkdir(parents=True)

e_lex = microresolve.MicroResolve(data_dir=str(stage("lex_only", root)))
e_both = microresolve.MicroResolve(data_dir=str(stage("both", root)))
ns_lex = e_lex.namespace(PACK_NAME)
ns_both = e_both.namespace(PACK_NAME)

# Load the 8 policy rules so we can match
rules = json.load(open(PACK_SRC / "_ns.json"))["policy_overrides"]
print(f"Loaded {len(rules)} policy override rules:\n")
for i, r in enumerate(rules):
print(f" [{i}] {r['words']} → {r['intent']} (bonus={r['bonus']})")
print()

# Examine every query
diff_prohibited = []
diff_benign = []
rule_fires = defaultdict(list) # rule_idx -> [(query, lex_top, both_top)]

def match_rule(query_lower):
"""Find which rule's words ALL appear in lowercased query."""
hits = []
for i, r in enumerate(rules):
if all(w in query_lower for w in r["words"]):
hits.append(i)
return hits

for entry in corpus["prohibited"]:
q = entry["text"]
gt = entry["expected_intent"]
a = top_intent_at_threshold(ns_lex, q)
b = top_intent_at_threshold(ns_both, q)
if a != b:
diff_prohibited.append((q, gt, a, b))
for ri in match_rule(q.lower()):
rule_fires[ri].append((q, a, b, "prohibited", gt))

for entry in corpus["benign"]:
q = entry["text"]
a = top_intent_at_threshold(ns_lex, q)
b = top_intent_at_threshold(ns_both, q)
if a != b:
diff_benign.append((q, a, b))
for ri in match_rule(q.lower()):
rule_fires[ri].append((q, a, b, "benign", None))

print("=" * 72)
print(f"Queries where lex-only vs both DISAGREE:")
print(f" prohibited diffs: {len(diff_prohibited)}")
print(f" benign diffs: {len(diff_benign)}")
print()
print("Per-rule firing count (rules that flipped an outcome):")
print()
for i, r in enumerate(rules):
n = len(rule_fires[i])
label = f"[{i}] {' + '.join(r['words'])} → {r['intent']}"
flag = "" if n > 0 else " ← DEAD: never fired"
print(f" {n:2d} {label:60s}{flag}")
print()

print("=" * 72)
print("Diff examples (queries where the addition of policy_overrides changed the result):")
print()
print("--- benign (policy helps reject false-positives) ---")
for q, a, b in diff_benign[:10]:
marker = "✓" if b == "legitimate_use" or b is None else "?"
print(f" {marker} '{q[:80]}'")
print(f" lex_only: {a} → both: {b}")
print()
print("--- prohibited (policy changes which prohibited intent is picked, or routes to legitimate_use) ---")
for q, gt, a, b in diff_prohibited[:10]:
wrong = " ⚠ moved away from ground truth" if a == gt and b != gt else ""
helps = " ✓ moved toward ground truth" if a != gt and b == gt else ""
print(f" '{q[:80]}'")
print(f" gt={gt} lex_only: {a} → both: {b}{wrong}{helps}")

print()
print("=" * 72)
print("Summary:")
fired_n = sum(1 for i in range(len(rules)) if len(rule_fires[i]) > 0)
print(f" Rules that ever fired on a query that flipped outcome: {fired_n} / {len(rules)}")

# Net benign FP reduction
benign_flips_to_legit = sum(1 for q, a, b in diff_benign if b == "legitimate_use" or b is None)
benign_flips_other = len(diff_benign) - benign_flips_to_legit
print(f" Benign queries that flipped:")
print(f" to 'legitimate_use' or NoMatch (helpful): {benign_flips_to_legit}")
print(f" to a different prohibited intent (concerning): {benign_flips_other}")


if __name__ == "__main__":
main()
160 changes: 160 additions & 0 deletions benchmarks/real_test_emotion_language.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
"""Real tests for emotion-detection and language-detect packs.

Self-seed accuracy is trivially circular. The honest questions:

LANGUAGE-DETECT — does it actually route foreign text to the right language?
Feed 20 real non-English samples (es, fr, ja, ar) and check routing.

EMOTION-DETECTION — can it disambiguate close emotions from overlapping vocab?
Feed hand-crafted unambiguous emotion queries and check routing.
"""
import json, shutil
from pathlib import Path
import microresolve

THRESHOLD = 1.5
ROOT = Path("/tmp/real_test")
if ROOT.exists():
shutil.rmtree(ROOT)
ROOT.mkdir(parents=True)

# ───────────────────────────────────────────────────────────────────────
# LANGUAGE-DETECT — feed actual non-English text
# ───────────────────────────────────────────────────────────────────────
print("=" * 72)
print("LANGUAGE-DETECT — real test on non-English text")
print("=" * 72)

shutil.copytree("packs/language-detect", ROOT / "language-detect" / "language-detect")
ns = microresolve.MicroResolve(data_dir=str(ROOT / "language-detect")).namespace("language-detect")

# 20 samples each from real-world multilingual text
LANG_PROBES = {
"spanish": [
"buenos días, ¿cómo está usted hoy?",
"me gustaría reservar una mesa para dos personas",
"el clima está muy bueno esta tarde",
"no entiendo lo que dijiste",
"pueden enviar la factura por correo electrónico",
"quiero cancelar mi suscripción",
"tengo una pregunta sobre el pedido",
"gracias por su ayuda",
],
"french": [
"bonjour, comment allez-vous aujourd'hui",
"je voudrais réserver une table pour deux",
"le temps est très beau cet après-midi",
"je ne comprends pas ce que vous dites",
"pouvez-vous envoyer la facture par email",
"je veux annuler mon abonnement",
"j'ai une question concernant ma commande",
"merci beaucoup pour votre aide",
],
"german": [
"guten tag, wie geht es ihnen heute",
"ich möchte einen tisch für zwei reservieren",
"das wetter ist heute sehr schön",
"ich verstehe nicht was sie sagen",
"können sie die rechnung per email schicken",
"ich möchte mein abonnement kündigen",
"ich habe eine frage zu meiner bestellung",
"vielen dank für ihre hilfe",
],
"japanese": [
"こんにちは、お元気ですか",
"二名でテーブルを予約したいです",
"今日の天気は素晴らしいです",
"あなたの言っていることがわかりません",
"領収書をメールで送ってもらえますか",
"サブスクリプションをキャンセルしたいです",
"注文について質問があります",
"ご協力ありがとうございます",
],
}

correct = 0
total = 0
errors = []
for true_lang, samples in LANG_PROBES.items():
expected = f"detect_{true_lang}"
pack_hit = 0
for q in samples:
r = ns.resolve(q)
top = next((i for i in r.intents if i.score >= THRESHOLD), None)
top_id = top.id if top else "—"
total += 1
if top_id == expected:
correct += 1
pack_hit += 1
else:
errors.append((q, expected, top_id, top.score if top else 0))
print(f" {true_lang:10s}: {pack_hit}/{len(samples)} routed to {expected}")

print(f"\n TOTAL: {correct}/{total} = {correct/total:.1%}")
if errors[:5]:
print(f"\n First 5 mis-routes:")
for q, exp, got, sc in errors[:5]:
print(f" '{q[:50]}' → expected {exp}, got {got} ({sc:.2f})")

# ───────────────────────────────────────────────────────────────────────
# EMOTION-DETECTION — adversarial in-domain
# ───────────────────────────────────────────────────────────────────────
print()
print("=" * 72)
print("EMOTION-DETECTION — disambiguation test on unambiguous queries")
print("=" * 72)

shutil.copytree("packs/emotion-detection", ROOT / "emotion-detection" / "emotion-detection")
ns2 = microresolve.MicroResolve(data_dir=str(ROOT / "emotion-detection")).namespace("emotion-detection")

EMOTION_PROBES = [
# clearly anxious
("i'm really worried this won't work out before the deadline", "anxious_worried"),
("i'm scared something bad might happen", "anxious_worried"),
("i can't stop worrying about the surgery tomorrow", "anxious_worried"),
# clearly frustrated / angry
("this is the third time the app crashed, i'm so angry", "frustrated_angry"),
("absolute joke of a service, fix your bugs", "frustrated_angry"),
("furious that my package still hasn't arrived", "frustrated_angry"),
# confused
("i have no idea how to set up this thing", "confused_lost"),
("the instructions don't make any sense to me", "confused_lost"),
("which button should i click i'm totally lost", "confused_lost"),
# disappointed
("expected so much better from this product", "disappointed_let_down"),
("really let down by the customer service today", "disappointed_let_down"),
("thought this would be great but i was wrong", "disappointed_let_down"),
# distressed / urgent
("emergency, i need help right now please", "distressed_urgent"),
("urgent — my account has been hacked", "distressed_urgent"),
# satisfied
("absolutely love this, exactly what i wanted", "satisfied_positive"),
("five stars, very happy with the experience", "satisfied_positive"),
("perfect product, exactly as described", "satisfied_positive"),
# neutral
("what time does the store open", "neutral_informational"),
("which version of the software do i need", "neutral_informational"),
("how do i reset my password", "neutral_informational"),
]

e_correct = 0
e_top3 = 0
e_errors = []
for q, expected in EMOTION_PROBES:
r = ns2.resolve(q)
top = next((i for i in r.intents if i.score >= THRESHOLD), None)
top_id = top.id if top else "—"
top3_ids = [i.id for i in r.intents[:3]]
if top_id == expected:
e_correct += 1
if expected in top3_ids:
e_top3 += 1
if top_id != expected:
e_errors.append((q, expected, top_id, top.score if top else 0))

print(f" top-1: {e_correct}/{len(EMOTION_PROBES)} = {e_correct/len(EMOTION_PROBES):.1%}")
print(f" top-3: {e_top3}/{len(EMOTION_PROBES)} = {e_top3/len(EMOTION_PROBES):.1%}")
if e_errors:
print(f"\n Mis-routes:")
for q, exp, got, sc in e_errors:
print(f" '{q[:55]}' expected={exp} got={got} ({sc:.2f})")
Loading
Loading