Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
534c9a4
main: ensure children (forks) dont register the same sigterm handler …
AlyaGomaa Feb 27, 2026
c71a2de
profiler: cancel join thread on all queues before closing them
AlyaGomaa Feb 27, 2026
a4be07e
increase profiler queue maxsize
AlyaGomaa Feb 27, 2026
a38703a
ensure all multiprocessing.Queues created are capped at 30MBs
AlyaGomaa Feb 27, 2026
81ca4b0
profiler: shutdown the syncmanager after all profilers are don eusing it
AlyaGomaa Feb 27, 2026
d7ad509
ensure all modules unsubscribe from all channels before closing in sh…
AlyaGomaa Feb 27, 2026
92d7a4b
Add sentinel value for for each queue used in the profiler
AlyaGomaa Feb 27, 2026
bee4f3f
replace using the multiprocessing.Manager() in profiler.py because to…
AlyaGomaa Feb 27, 2026
4ea6e96
iasyncmodule: unsubscribe from all channels on shutdown
AlyaGomaa Feb 27, 2026
f644692
imodule: have a common _shutdown_gracefully() for all modules, that c…
AlyaGomaa Mar 2, 2026
2c9232d
profiler: move the monitoring log to its own thread that stays up as …
AlyaGomaa Mar 2, 2026
350d93b
profiler workers: only die when no more flows are there in the profil…
AlyaGomaa Mar 2, 2026
4b6de70
profiler: keep track of the number of started workers in the db
AlyaGomaa Mar 2, 2026
2f8a2de
input: put num_of_started_profiler_workers sentinel stop values in th…
AlyaGomaa Mar 2, 2026
017095f
profiler: start all workers, then immediately wait for them to stop i…
AlyaGomaa Mar 2, 2026
71a7840
icore: lazily set next_fps_check_time if not set
AlyaGomaa Mar 2, 2026
a806cdf
worker: make the only signalf or workers to stop is the stop msg, del…
AlyaGomaa Mar 2, 2026
6a03379
iasync_module: unsubscribe from redis channels after the module is do…
AlyaGomaa Mar 2, 2026
33cfece
evidence: wait 30s instead of 1min for new evidence to arrive before …
AlyaGomaa Mar 2, 2026
51e5b49
Add a subscribe_to_channels() function in every single module and cal…
AlyaGomaa Mar 2, 2026
a61f207
imodule: make subscribe_to_channels() an abstractmethod and add a com…
AlyaGomaa Mar 2, 2026
2f286fc
add subscribe_to_channels() to ICore classes
AlyaGomaa Mar 2, 2026
083bbe1
add subscribe_to_channels() to profiler worker
AlyaGomaa Mar 2, 2026
6190aaf
Fix not being able to flush the db when starting a new slips instance…
AlyaGomaa Mar 2, 2026
5b8d1ba
delete redis.conf from git to avoid accidentally commiting it with lo…
AlyaGomaa Mar 2, 2026
ac84e16
input classes: fix duplicate calls to mark_self_as_done_processing()
AlyaGomaa Mar 2, 2026
ca50af2
make sure that input is done before running the profiler's shutdown func
AlyaGomaa Mar 3, 2026
f60961f
profiler: start the profiler monitor loop only after input_handler_ob…
AlyaGomaa Mar 3, 2026
2c41a98
update unit tests
AlyaGomaa Mar 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .secrets.baseline
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
"filename": "config/slips.yaml",
"hashed_secret": "4cac50cee3ad8e462728e711eac3e670753d5016",
"is_verified": false,
"line_number": 223
"line_number": 268
}
],
"dataset/test14-malicious-zeek-dir/http.log": [
Expand Down Expand Up @@ -7185,5 +7185,5 @@
}
]
},
"generated_at": "2026-01-29T18:21:33Z"
"generated_at": "2026-03-02T22:46:58Z"
}
4 changes: 0 additions & 4 deletions config/redis.conf

This file was deleted.

23 changes: 14 additions & 9 deletions managers/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
from typing import (
List,
Tuple,
Dict,
Iterable,
)

from exclusiveprocess import (
Expand Down Expand Up @@ -58,18 +56,19 @@ def __init__(self, main):
# Can be used by signal handlers before startup finishes.
self.processes: List[Process] = []

# this is the queue that will be used by the input proces
# this is the queue that will be used by the input process
# to pass flows to the profiler
# this max size is decided based on the avg size of each flow and
# tha max memory (4g) that this queue is allowed to use
self.profiler_queue = Queue(maxsize=50000)
self.termination_event: Event = Event()
# this max size is decided based on the avg size of each flow (650
# bytes), and the max memory that this queue is allowed to
# use (1GB), so 1321528 bytes will be 2033 flows in queue at max
self.profiler_queue = Queue(maxsize=1321528)
self.termination_event = Event()
# to make sure we only warn the user once about
# the pending modules
self.warning_printed_once = False
# this one has its own termination event because we want it to
# shutdown at the very end of all other slips modules.
self.evidence_handler_termination_event: Event = Event()
self.evidence_handler_termination_event = Event()
self.stopped_modules = []
# used to stop slips when these 2 are done
# since the semaphore count is zero, slips.py will wait until another
Expand All @@ -85,6 +84,9 @@ def __init__(self, main):
# and inout stops and renders the profiler queue useless and profiler
# cant get more lines anymore!
self.is_profiler_done_event = Event()
# is set by the input process to indicate no more flows are coming
# so profiler can safely begin shutdown/joins.
self.is_input_done_event = Event()
self.read_config()

def read_config(self):
Expand Down Expand Up @@ -122,6 +124,7 @@ def start_profiler_process(self):
is_profiler_done=self.is_profiler_done,
profiler_queue=self.profiler_queue,
is_profiler_done_event=self.is_profiler_done_event,
is_input_done_event=self.is_input_done_event,
)
profiler_process.start()
self.main.print(
Expand Down Expand Up @@ -173,6 +176,7 @@ def start_input_process(self):
zeek_dir=self.main.zeek_dir,
line_type=self.main.line_type,
is_profiler_done_event=self.is_profiler_done_event,
is_input_done_event=self.is_input_done_event,
)
input_process.start()
self.main.print(
Expand Down Expand Up @@ -570,7 +574,8 @@ def get_hitlist_in_order(self) -> Tuple[List[Process], List[Process]]:
self.main.db.get_pid_of("Exporting Alerts")
)
# remove all None PIDs. this happens when a module in that list
# isnt started in the current run.
# isnt started in the current run. e.g. virustotal module starts then
# stops immediately if no API is found. so its pid will be None.
pids_to_kill_last: List[int] = [
pid for pid in pids_to_kill_last if pid is not None
]
Expand Down
12 changes: 12 additions & 0 deletions managers/redis_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ def __init__(self, main):
self.end_port = 32850
self.running_logfile = "running_slips_info.txt"

def _clear_cached_redis_instance(self, port: int) -> None:
"""
Ensure RedisDB singleton cache doesn't block later startup flushes.
"""
if port in RedisDB.instances:
del RedisDB.instances[port]

def get_start_port(self):
return self.start_port

Expand Down Expand Up @@ -386,6 +393,9 @@ def get_redis_port(self) -> int:
):
self.main.terminate_slips()
return
# allow the DBManager obj created in main.py to reconnect
# and flush if needed
self._clear_cached_redis_instance(redis_port)

elif self.main.args.multiinstance:
redis_port = self.get_random_redis_port()
Expand Down Expand Up @@ -413,6 +423,8 @@ def get_redis_port(self) -> int:
f"{redis_port}."
)
self.main.terminate_slips()
# allow main DBManager to reconnect and flush if needed
self._clear_cached_redis_instance(redis_port)

return redis_port

Expand Down
58 changes: 34 additions & 24 deletions modules/anomaly_detection_https/anomaly_detection_https.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,9 @@ def update_min_std_floor(self):
candidate = self.floor_scale * max(q10, sigma_mad, self.floor_min)
candidate = min(self.floor_max, max(self.floor_min, candidate))
beta = min(1.0, max(0.0, self.floor_update_beta))
self.min_std_floor = (1.0 - beta) * self.min_std_floor + beta * candidate
self.min_std_floor = (
1.0 - beta
) * self.min_std_floor + beta * candidate

def zscore(self, value: float) -> float:
std = math.sqrt(max(self.var, self.min_std_floor * self.min_std_floor))
Expand Down Expand Up @@ -162,8 +164,6 @@ class AnomalyDetectionHTTPS(IModule):
authors = ["Sebastian Garcia"]

def init(self):
self.c1 = self.db.subscribe("new_ssl")
self.channels = {"new_ssl": self.c1}
self.classifier = FlowClassifier()
self.read_configuration()
self.operational_log_path = os.path.join(
Expand Down Expand Up @@ -205,6 +205,10 @@ def init(self):
"ADWIN requested but river is not installed; using pre-ADWIN drift logic.",
)

def subscribe_to_channels(self):
self.c1 = self.db.subscribe("new_ssl")
self.channels = {"new_ssl": self.c1}

def read_configuration(self):
conf = ConfigParser()
self.training_hours = conf.https_anomaly_training_hours()
Expand All @@ -221,9 +225,7 @@ def read_configuration(self):
self.ja3_min_variants_per_server = (
conf.https_anomaly_ja3_min_variants_per_server()
)
self.requested_use_adwin_drift = (
conf.https_anomaly_use_adwin_drift()
)
self.requested_use_adwin_drift = conf.https_anomaly_use_adwin_drift()
self.adwin_delta = conf.https_anomaly_adwin_delta()
self.adwin_clock = conf.https_anomaly_adwin_clock()
self.adwin_grace_period = conf.https_anomaly_adwin_grace_period()
Expand Down Expand Up @@ -318,9 +320,7 @@ def log_event(
reset = "\033[0m" if self.log_colors else ""
wall_clock = self._ts_to_iso()
traffic_clock = (
self._ts_to_iso(traffic_ts)
if traffic_ts is not None
else "n/a"
self._ts_to_iso(traffic_ts) if traffic_ts is not None else "n/a"
)
metrics_json = json.dumps(metrics, sort_keys=True)
line = (
Expand All @@ -329,7 +329,9 @@ def log_event(
)
if color:
line = f"{color}{line}{reset}"
with open(self.operational_log_path, "a", encoding="utf-8") as log_file:
with open(
self.operational_log_path, "a", encoding="utf-8"
) as log_file:
log_file.write(f"{line}\n")

@staticmethod
Expand All @@ -344,7 +346,9 @@ def to_float(value, default=0.0) -> float:
except (TypeError, ValueError):
return float(default)

def get_traffic_ts(self, flow, fallback_ts: Optional[float] = None) -> float:
def get_traffic_ts(
self, flow, fallback_ts: Optional[float] = None
) -> float:
"""
Returns traffic timestamp from flow.starttime.
Detection windows must use traffic time, not host wall-clock time.
Expand Down Expand Up @@ -466,7 +470,9 @@ def evidence_ts_from_traffic_ts(ts: float) -> str:
)

@staticmethod
def threat_level_from_confidence_level(confidence_level: str) -> ThreatLevel:
def threat_level_from_confidence_level(
confidence_level: str,
) -> ThreatLevel:
# Requested policy:
# - confidence low/medium -> threat level low
# - confidence high -> threat level medium
Expand Down Expand Up @@ -500,7 +506,7 @@ def build_victim(
)
return None

def emit_anomaly_evidence(
def set_anomaly_evidence(
self,
profileid: str,
twid_number: int,
Expand Down Expand Up @@ -571,7 +577,11 @@ def emit_anomaly_evidence(
f"reason={reason_name}; value={value}; why={why}"
)

reasons_text = " | ".join(reason_parts) if reason_parts else "reason=Unknown; value=; why=not provided"
reasons_text = (
" | ".join(reason_parts)
if reason_parts
else "reason=Unknown; value=; why=not provided"
)
description = (
f"HTTPS anomaly: type={kind}; confidence={confidence.get('level')} "
f"({confidence_score:.3f}); {reasons_text}."
Expand Down Expand Up @@ -699,9 +709,8 @@ def finalize_hour_bucket(self, profileid: str, state: HostState):
ssl_flows = float(bucket.ssl_flows)
known_server_avg_bytes = 0.0
if bucket.known_servers_flow_count > 0:
known_server_avg_bytes = (
bucket.known_servers_total_bytes
/ float(bucket.known_servers_flow_count)
known_server_avg_bytes = bucket.known_servers_total_bytes / float(
bucket.known_servers_flow_count
)

features = {
Expand Down Expand Up @@ -743,7 +752,9 @@ def finalize_hour_bucket(self, profileid: str, state: HostState):
for feature_name, value in features.items():
z = z_by_feature.get(feature_name, 0.0)
if z >= self.hourly_zscore_threshold:
model = self.get_or_create_hourly_model(state, feature_name)
model = self.get_or_create_hourly_model(
state, feature_name
)
hourly_anomalies.append(
{
"feature": feature_name,
Expand Down Expand Up @@ -787,7 +798,7 @@ def finalize_hour_bucket(self, profileid: str, state: HostState):
"anomalies": hourly_anomalies,
},
)
self.emit_anomaly_evidence(
self.set_anomaly_evidence(
profileid=profileid,
twid_number=state.last_twid,
traffic_ts=bucket.start_ts,
Expand Down Expand Up @@ -996,7 +1007,8 @@ def process_ssl_event(
twid_number: int,
):
ts = self.get_traffic_ts(
ssl_flow, fallback_ts=self.to_float(conn_info.get("starttime"), 0.0)
ssl_flow,
fallback_ts=self.to_float(conn_info.get("starttime"), 0.0),
)
state = self.ensure_hour_bucket(profileid, ts)
state.last_twid = twid_number
Expand Down Expand Up @@ -1111,7 +1123,7 @@ def process_ssl_event(
"flow_anomalies": flow_anomalies,
},
)
self.emit_anomaly_evidence(
self.set_anomaly_evidence(
profileid=profileid,
twid_number=twid_number,
traffic_ts=ts,
Expand Down Expand Up @@ -1244,9 +1256,7 @@ def process_ssl_event(
"flow_raw_signals": flow_raw_signals,
"alpha": alpha,
"fit_method": (
"welford_online_moments"
if alpha is None
else "ewma"
"welford_online_moments" if alpha is None else "ewma"
),
},
)
Expand Down
14 changes: 8 additions & 6 deletions modules/arp/arp.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ class ARP(IModule):
authors = ["Alya Gomaa"]

def init(self):
self.c1 = self.db.subscribe("new_arp")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_arp": self.c1,
"tw_closed": self.c2,
}
self.read_configuration()
self.classifier = FlowClassifier()
# this dict will categorize arp requests by profileid_twid
Expand Down Expand Up @@ -68,6 +62,14 @@ def init(self):
self.is_zeek_running: bool = self.is_running_zeek()
self.evidence_filter = ARPEvidenceFilter(self.conf, self.args, self.db)

def subscribe_to_channels(self):
self.c1 = self.db.subscribe("new_arp")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_arp": self.c1,
"tw_closed": self.c2,
}

def read_configuration(self):
conf = ConfigParser()
self.home_network = conf.home_network_ranges
Expand Down
14 changes: 8 additions & 6 deletions modules/arp_poisoner/arp_poisoner.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ class ARPPoisoner(IModule):
authors = ["Alya Gomaa"]

def init(self):
self.c1 = self.db.subscribe("new_blocking")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_blocking": self.c1,
"tw_closed": self.c2,
}
self._time_since_last_repoison = {}
self._time_since_last_internet_cut = {}
self.log_file_path = os.path.join(self.output_dir, "arp_poisoning.log")
Expand All @@ -60,6 +54,14 @@ def init(self):
# keeps track of which interface were blocked ips attacking on
self.ip_interface_map = {}

def subscribe_to_channels(self):
self.c1 = self.db.subscribe("new_blocking")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_blocking": self.c1,
"tw_closed": self.c2,
}

def log(self, text):
"""Logs the given text to the blocking log file"""
with self.blocking_logfile_lock:
Expand Down
14 changes: 8 additions & 6 deletions modules/blocking/blocking.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,6 @@ class Blocking(IModule):
authors = ["Sebastian Garcia, Alya Gomaa"]

def init(self):
self.c1 = self.db.subscribe("new_blocking")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_blocking": self.c1,
"tw_closed": self.c2,
}
if platform.system() == "Darwin":
self.print("Mac OS blocking is not supported yet.")
sys.exit()
Expand All @@ -54,6 +48,14 @@ def init(self):
self.ap_info: None | Dict[str, str] = self.db.get_ap_info()
self.is_running_in_ap_mode = True if self.ap_info else False

def subscribe_to_channels(self):
self.c1 = self.db.subscribe("new_blocking")
self.c2 = self.db.subscribe("tw_closed")
self.channels = {
"new_blocking": self.c1,
"tw_closed": self.c2,
}

def log(self, text: str):
"""Logs the given text to the blocking log file"""
with self.blocking_logfile_lock:
Expand Down
4 changes: 3 additions & 1 deletion modules/cesnet/cesnet.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@ class CESNET(IModule):

def init(self):
self.read_configuration()
self.stop_module = False

def subscribe_to_channels(self):
self.c1 = self.db.subscribe("export_evidence")
self.channels = {
"export_evidence": self.c1,
}
self.stop_module = False

def read_configuration(self):
"""Read importing/exporting preferences from slips.yaml"""
Expand Down
Loading
Loading