-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdetect.py
More file actions
184 lines (141 loc) · 7.23 KB
/
detect.py
File metadata and controls
184 lines (141 loc) · 7.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
"""
DDoS detection starter project (defensive)
How to use:
1. Place your flow CSV(s) (bidirectional flows) in data/ and update DATA_PATH.
Recommended columns: src_ip, dst_ip, src_port, dst_port, protocol, flow_duration, total_fwd_packets,
total_bwd_packets, total_length_of_fwd_packets, total_length_of_bwd_packets, fwd_packet_length_max, ...
Also a target column named 'label' with values like 'BENIGN' and 'DDoS'.
2. Train model:
python ddos_detection_starter.py --train --data_path data/flows.csv --model_out models/rf_ddos.joblib
3. Serve a simple prediction API:
python ddos_detection_starter.py --serve --model_in models/rf_ddos.joblib --scaler_in models/scaler.joblib
NOTE: This is strictly for defensive research and testing on networks you own or are authorized to test.
Do NOT use this code to attack any systems.
"""
import argparse
import os
import joblib
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import classification_report, precision_recall_fscore_support, roc_auc_score
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn
# ----------------------- Data loading & preprocessing -----------------------
def load_flow_csv(path):
"""Load CSV into pandas DataFrame. Accepts a single CSV path or folder path (loads all .csv in folder)."""
if os.path.isdir(path):
files = [os.path.join(path, f) for f in os.listdir(path) if f.endswith('.csv')]
if not files:
raise FileNotFoundError(f'No CSV files found in folder {path}')
df = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
else:
df = pd.read_csv(path)
return df
def preprocess(df, label_column='label'):
"""Basic preprocessing pipeline:
- Drop columns that are obviously non-numeric for modeling (IPs) unless you engineer features from them.
- Fill/Drop NaNs.
- Map labels to binary: DDoS (1) vs BENIGN (0) — adapt to your dataset labels.
- Select numeric features automatically.
Returns: X (numeric feature DataFrame), y (Series)
"""
df = df.copy()
# Drop obvious identifiers (keep ports/protocol numbers if present)
for col in ['src_ip', 'dst_ip', 'timestamp', 'start_time', 'end_time']:
if col in df.columns:
df.drop(columns=[col], inplace=True)
# Simple label mapping (customize per your dataset)
if label_column not in df.columns:
raise ValueError(f"Label column '{label_column}' not found in data")
# Normalize label values to binary
df[label_column] = df[label_column].astype(str).str.upper()
df['target'] = df[label_column].apply(lambda v: 1 if 'DDOS' in v or 'DDoS'.upper() in v or 'DOS' in v else 0)
# Select numeric columns only for baseline
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
if 'target' not in numeric_cols:
numeric_cols.append('target')
X = df[numeric_cols].drop(columns=['target'])
y = df['target']
# Fill NaNs with column median (simple strategy)
X = X.fillna(X.median())
return X, y
# ----------------------- Training & evaluation -----------------------
def train_model(X, y, model_out='models/rf_ddos.joblib', scaler_out='models/scaler.joblib'):
os.makedirs(os.path.dirname(model_out) or '.', exist_ok=True)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, stratify=y, random_state=42)
scaler = StandardScaler()
X_train_s = scaler.fit_transform(X_train)
X_test_s = scaler.transform(X_test)
clf = RandomForestClassifier(n_estimators=200, class_weight='balanced', random_state=42, n_jobs=-1)
clf.fit(X_train_s, y_train)
y_pred = clf.predict(X_test_s)
y_proba = clf.predict_proba(X_test_s)[:, 1] if hasattr(clf, 'predict_proba') else None
print('\n--- Classification report (test set) ---')
print(classification_report(y_test, y_pred, digits=4))
if y_proba is not None:
try:
auc = roc_auc_score(y_test, y_proba)
print(f'ROC AUC: {auc:.4f}')
except Exception:
pass
# Save model and scaler
joblib.dump(clf, model_out)
joblib.dump(scaler, scaler_out)
print(f'Saved model -> {model_out}')
print(f'Saved scaler -> {scaler_out}')
return clf, scaler
# ----------------------- Lightweight prediction API -----------------------
class FlowFeatures(BaseModel):
# Accept arbitrary numeric features as a dict
features: dict
def create_api(model_path, scaler_path, feature_order=None):
app = FastAPI()
clf = joblib.load(model_path)
scaler = joblib.load(scaler_path)
@app.post('/predict')
def predict(flow: FlowFeatures):
# Expect flow.features to be a map {feature_name: value}
feats = flow.features
if feature_order is None:
# infer order from provided features (sorted) — but better to pass exact order used during training
cols = sorted(feats.keys())
else:
cols = feature_order
X = np.array([[feats.get(c, 0.0) for c in cols]], dtype=float)
Xs = scaler.transform(X)
proba = clf.predict_proba(Xs)[0, 1] if hasattr(clf, 'predict_proba') else None
pred = int(clf.predict(Xs)[0])
return {'prediction': pred, 'ddos_score': float(proba) if proba is not None else None, 'feature_order': cols}
return app
# ----------------------- CLI -----------------------
def main():
parser = argparse.ArgumentParser(description='DDoS detection starter script')
parser.add_argument('--train', action='store_true', help='Train model from data')
parser.add_argument('--data_path', type=str, default='data/flows.csv', help='Path to CSV or folder')
parser.add_argument('--label', type=str, default='label', help='Label column name')
parser.add_argument('--model_out', type=str, default='models/rf_ddos.joblib', help='Path to save model')
parser.add_argument('--scaler_out', type=str, default='models/scaler.joblib', help='Path to save scaler')
parser.add_argument('--serve', action='store_true', help='Run prediction API')
parser.add_argument('--model_in', type=str, default='models/rf_ddos.joblib', help='Model path to load')
parser.add_argument('--scaler_in', type=str, default='models/scaler.joblib', help='Scaler path to load')
parser.add_argument('--host', type=str, default='0.0.0.0', help='API host')
parser.add_argument('--port', type=int, default=8000, help='API port')
args = parser.parse_args()
if args.train:
print('Loading data...')
df = load_flow_csv(args.data_path)
X, y = preprocess(df, label_column=args.label)
print(f'Dataset shape: X={X.shape}, y={y.shape}')
train_model(X, y, model_out=args.model_out, scaler_out=args.scaler_out)
if args.serve:
if not os.path.exists(args.model_in) or not os.path.exists(args.scaler_in):
raise FileNotFoundError('Model or scaler not found. Train first or provide correct paths.')
app = create_api(args.model_in, args.scaler_in, feature_order=None)
print(f'Starting API on {args.host}:{args.port}...')
uvicorn.run(app, host=args.host, port=args.port)
if __name__ == '__main__':
main()