Quick Python implementations to mitigate common AI security threats.
import re
sanitized_prompt = re.sub(r'(ignore|override|bypass|system|admin|root)', '', user_input, flags=re.IGNORECASE)injection_patterns = ['ignore', 'override', 'system prompt', 'instructions', 'tell me']
is_injection = any(pattern.lower() in user_input.lower() for pattern in injection_patterns)safe_prompt = f"Analyze the following user query: '{user_input.replace('{', '').replace('}', '')}' with role: {safe_role}"import hashlib
data_hash = hashlib.sha256(str(training_data).encode()).hexdigest()
assert data_hash == expected_hash, "Data integrity compromised"import numpy as np
outliers = np.where(np.abs(np.array(labels) - np.median(labels)) > 3 * np.std(labels))[0]import json
with open(f'training_data_v{version}.json', 'w') as f: json.dump(data_checksum, f)import numpy as np
noisy_prediction = prediction + np.random.laplace(0, epsilon/sensitivity, prediction.shape)from functools import wraps
from time import time
@wraps
def rate_limit(func):
last_called = [0.0];
def wrapper(*args, **kwargs):
elapsed = time() - last_called[0];
wait = 1.0 - elapsed
if wait > 0:
time.sleep(wait)
last_called[0] = time()
return func(*args, **kwargs)
return wrapperprediction_output = {"class": predicted_class} if hide_confidence else {"class": predicted_class, "confidence": confidence_score}import numpy as np
perturbation_score = np.sum(np.abs(current_input - baseline_input)) / np.sum(np.abs(baseline_input))
is_adversarial = perturbation_score > thresholdfrom sklearn.preprocessing import StandardScaler
normalized_input = StandardScaler().fit_transform([raw_input])[0]predictions = [model1.predict(x), model2.predict(x), model3.predict(x)]
final_prediction = max(set(predictions), key=predictions.count)harmful_keywords = ['illegal', 'harm', 'violence', 'bypass'];
contains_harmful = any(keyword in output.lower() for keyword in harmful_keywords)from transformers import pipeline
classifier = pipeline("zero-shot-classification")
result = classifier(output, ["safe", "unsafe"]);
is_safe = result['labels'][0] == 'safe' and result['scores'][0] > 0.9filters = [lambda x: len(x) > 5000, lambda x: harmful_words in x.lower(), lambda x: profanity.contains_profanity(x)]
output_safe = not any(filter_func(output) for filter_func in filters)import cv2
frames = [cv2.VideoCapture(video_path).read()[1] for _ in range(10)]
consistency_score = np.mean([np.corrcoef(frames[i].flatten(), frames[i+1].flatten())[0,1] for i in range(len(frames)-1)])from deepface import DeepFake
prediction = DeepFake.predict(image_path)
is_deepfake = prediction['label'] == 'FAKE' and prediction['confidence'] > thresholdimport librosa
y, sr = librosa.load(audio_path)
mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
deepfake_score = np.std(mfcc) # unusually high variance indicates synthesisimport hashlib
package_hash = hashlib.sha256(open(package_path, 'rb').read()).hexdigest()
is_valid = package_hash == package_manifest[package_name]import subprocess
vulnerable_packages = subprocess.check_output(['pip-audit']).decode().split('\n')import gpg
gpg_context = gpg.Context()
verified = gpg_context.verify(open(package_sig, 'rb'), open(package_file, 'rb'))[0].validimport secrets
new_api_key = secrets.token_urlsafe(32)from flask_limiter import Limiter
limiter = Limiter(app, key_func=lambda: request.remote_addr, default_limits=["200 per day", "50 per hour"])import hmac
is_valid = hmac.compare_digest(hmac.new(secret_key, request_body, 'sha256').hexdigest(), request_signature)import hashlib
query_fingerprint = hashlib.md5(str(sorted(input_features)).encode()).hexdigest()query_count = defaultdict(int)
is_extraction_attempt = sum(1 for q in recent_queries if similarity(q, current_query) > 0.95) > thresholdoutput = {"prediction": class_label} if restrict_info else {"prediction": class_label, "probabilities": all_probabilities, "confidence": confidence}from sklearn.metrics import confusion_matrix
group_accuracy = {group: confusion_matrix(y_true[demographic==group], y_pred[demographic==group]).trace() / len(y_true[demographic==group]) for group in demographics}
disparate_impact = max(group_accuracy.values()) / min(group_accuracy.values())fairness_penalty = sum(abs(group_accuracy[g] - mean_accuracy) for g in group_accuracy.keys())
total_loss = classification_loss + fairness_weight * fairness_penaltyfrom fairlearn.metrics import MetricFrame
metric_frame = MetricFrame(metrics={"accuracy": accuracy_score}, y_true=y_true, y_pred=y_pred, sensitive_features=demographics)MAX_INPUT_SIZE = 10000
is_valid = len(str(user_input)) <= MAX_INPUT_SIZEfrom signal import signal, SIGALRM, alarm
signal(SIGALRM, lambda s, a: (_ for _ in ()).throw(TimeoutError()))
alarm(30) # 30-second timeoutimport resource
resource.setrlimit(resource.RLIMIT_CPU, (30, 30)) # 30 seconds max CPU
resource.setrlimit(resource.RLIMIT_AS, (512000000, 512000000)) # 512MB max memoryfrom difflib import SequenceMatcher
is_hallucination = max(SequenceMatcher(None, model_output, fact).ratio() for fact in knowledge_base) < 0.7uses_hallucination = model_confidence > 0.9 and not verify_against_sources(model_output, trusted_sources)from langchain.chains import RetrievalQA
qa_chain = RetrievalQA.from_chain_type(llm=llm, chain_type="stuff", retriever=retriever)import hashlib
anonymized_value = hashlib.sha256(sensitive_data.encode()).hexdigest()from t_closeness import tcloseness
is_private = tcloseness(data, sensitive_columns, k=5) >= thresholdminimal_data = {k: v for k, v in user_data.items() if k in required_fields}from jsonschema import validate
validate(instance=data, schema=expected_schema)import pandas as pd
data_quality = pd.DataFrame(data).isnull().sum() / len(data)
is_clean = (data_quality < 0.05).all()import hashlib
verified = hashlib.sha256(str(sorted(data.items())).encode()).hexdigest() == provided_hashimport logging
logger = logging.getLogger('ai_security')
logger.warning(f"Threat detected: {threat_type} | Input: {sanitized_input} | Timestamp: {datetime.now()}")from sklearn.ensemble import IsolationForest
anomaly_detector = IsolationForest(contamination=0.1)
is_anomalous = anomaly_detector.predict([model_metrics]) == -1from collections import defaultdict
security_events = defaultdict(list)
if threat_detected: security_events[threat_type].append({"timestamp": now, "details": details})def secure_ai_inference(user_input, model):
# 1. Input validation
if len(user_input) > 10000: return {"error": "Input too large"}
# 2. Injection detection
if any(p in user_input.lower() for p in ['ignore', 'override']): return {"error": "Injection attempt detected"}
# 3. Get prediction with timeout
import signal
signal.signal(signal.SIGALRM, lambda s, a: (_ for _ in ()).throw(TimeoutError()))
signal.alarm(30)
prediction = model.predict(user_input)
signal.alarm(0)
# 4. Check for hallucination
if prediction['confidence'] > 0.9 and not verify_facts(prediction['output']): return {"warning": "Low confidence in output"}
# 5. Content filtering
if any(word in prediction['output'].lower() for word in harmful_keywords): return {"error": "Unsafe output filtered"}
# 6. Rate limiting
if query_count[user_id] > 100: return {"error": "Rate limit exceeded"}
# 7. Log and return
logger.info(f"Safe inference: {user_id} | Prediction: {prediction}")
return prediction- Input sanitization:
re.sub()or parameterized prompts - Rate limiting:
@rate_limitdecorator orflask_limiter - Content filtering: keyword matching or zero-shot classification
- Data validation: JSON schema or pandas quality checks
- Differential privacy: add Laplace noise to outputs
- Timeout protection:
signal.alarm()ortimeoutdecorator - Logging & monitoring: structured logging with security events
- Bias detection: group accuracy comparison
- Hallucination checks: fact verification against KB
- API security: HMAC verification and key rotation
- These are foundational solutions; production systems need comprehensive testing
- Combine multiple techniques for defense-in-depth strategy
- Regular updates needed as threats evolve
- Consider performance implications when implementing security measures
- Always validate effectiveness through security audits and penetration testing