diff --git a/Cargo.lock b/Cargo.lock index f2506ca..5d9225f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -377,6 +377,22 @@ dependencies = [ "zeroize", ] +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "ff" version = "0.13.1" @@ -512,6 +528,12 @@ version = "0.2.178" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "lock_api" version = "0.4.14" @@ -543,6 +565,15 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "df39d232f5c40b0891c10216992c2f250c054105cb1e56f0fc9032db6203ecc1" +[[package]] +name = "memmap2" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "714098028fe011992e1c3962653c96b2d578c4b4bce9036e15ff220319b1e0e3" +dependencies = [ + "libc", +] + [[package]] name = "memoffset" version = "0.9.1" @@ -918,6 +949,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +dependencies = [ + "bitflags 2.11.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "rustversion" version = "1.0.22" @@ -1030,7 +1074,7 @@ dependencies = [ [[package]] name = "stackforge" -version = "0.6.0" +version = "0.6.1" dependencies = [ "pyo3", "stackforge-automata", @@ -1039,7 +1083,7 @@ dependencies = [ [[package]] name = "stackforge-automata" -version = "0.6.0" +version = "0.6.1" dependencies = [ "bytes", "stackforge-core", @@ -1047,7 +1091,7 @@ dependencies = [ [[package]] name = "stackforge-core" -version = "0.6.0" +version = "0.6.1" dependencies = [ "aes", "aes-gcm", @@ -1061,6 +1105,7 @@ dependencies = [ "hkdf", "hmac", "md-5", + "memmap2", "p256", "pcap-file", "pnet_datalink", @@ -1069,6 +1114,7 @@ dependencies = [ "sha1", "sha2", "smallvec", + "tempfile", "thiserror 2.0.17", "x25519-dalek", ] @@ -1128,6 +1174,19 @@ version = "0.13.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b1dd07eb858a2067e2f3c7155d54e929265c264e6f37efe3ee7a8d1b5a1dd0ba" +[[package]] +name = "tempfile" +version = "3.25.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0136791f7c95b1f6dd99f9cc786b91bb81c3800b639b3478e561ddb7be95e5f1" +dependencies = [ + "fastrand", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys", +] + [[package]] name = "thiserror" version = "1.0.69" @@ -1254,6 +1313,15 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-sys" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae137229bcbd6cdf0f7b80a31df61766145077ddf49416a728b02cb3921ff3fc" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/Cargo.toml b/Cargo.toml index e576aa0..7fa9eb6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ shared-version = true tag-name = "v{{version}}" [workspace.package] -version = "0.6.0" +version = "0.6.1" edition = "2024" license = "GPL-3.0-only" authors = ["Stackforge Contributors"] @@ -31,6 +31,8 @@ bytes = "1.11.0" smallvec = "1.15.1" thiserror = "2.0.17" dashmap = "6.1" +memmap2 = "0.9" +tempfile = "3" stackforge-core = { path = "crates/stackforge-core" } stackforge-automata = { path = "crates/stackforge-automata" } diff --git a/README.md b/README.md index 3df6337..36bd1f6 100644 --- a/README.md +++ b/README.md @@ -11,9 +11,10 @@ - **Scapy-style API** — Stack layers with `Ether() / IP() / TCP()`, set fields with keyword arguments - **High Performance** — Core logic in Rust, zero-copy parsing, copy-on-write mutation -- **Broad Protocol Support** — Ethernet, ARP, IPv4/IPv6, TCP, UDP, ICMP/ICMPv6 (with echo correlation), DNS, HTTP/1.x, HTTP/2, QUIC, L2TP, 802.11 (Wi-Fi), 802.15.4 (Zigbee), and custom protocols -- **Stateful Flow Extraction** — Extract bidirectional conversations from PCAP files with TCP state tracking, stream reassembly, UDP timeout handling, and optional max packet/flow length tracking -- **PCAP I/O** — Read and write pcap files with `rdpcap()` / `wrpcap()` +- **Broad Protocol Support** — Ethernet, ARP, IPv4/IPv6, TCP, UDP, ICMP/ICMPv6 (with echo correlation), DNS, HTTP/1.x, HTTP/2, QUIC, L2TP, MQTT, MQTT-SN, Modbus, Z-Wave, FTP, TFTP, SMTP, POP3, IMAP, 802.11 (Wi-Fi), 802.15.4 (Zigbee), and custom protocols +- **Stateful Flow Extraction** — Extract bidirectional conversations from PCAP/PcapNG files with TCP state tracking, stream reassembly, UDP timeout handling, and optional max packet/flow length tracking +- **Memory-Budgeted Streaming** — Process gigabyte-sized captures without loading everything into RAM; set a memory budget and reassembly buffers automatically spill to memory-mapped temp files +- **PCAP & PcapNG I/O** — Read and write both classic PCAP and PcapNG files with auto-detection via `rdpcap()` / `wrpcap()` / `wrpcapng()` - **Python Bindings** — Seamless integration via PyO3/maturin - **Custom Protocols** — Define runtime protocols with `CustomLayer` and typed fields @@ -77,10 +78,10 @@ print(pkt.summary()) # "Ethernet / IPv4 / TCP" print(pkt.show()) # detailed layer view ``` -### Read and write PCAP files +### Read and write PCAP / PcapNG files ```python -from stackforge import rdpcap, wrpcap, PcapReader, Ether, IP, TCP +from stackforge import rdpcap, wrpcap, wrpcapng, PcapReader, Ether, IP, TCP # Write packets to a pcap file packets = [ @@ -89,13 +90,20 @@ packets = [ ] wrpcap("capture.pcap", packets) -# Read all packets at once -packets = rdpcap("capture.pcap") +# Write PcapNG format explicitly +wrpcapng("capture.pcapng", packets) + +# wrpcap auto-detects format from extension +wrpcap("capture.pcapng", packets) # writes PcapNG + +# Read any format (auto-detected) +packets = rdpcap("capture.pcap") # classic PCAP +packets = rdpcap("capture.pcapng") # PcapNG — same API for pkt in packets: print(pkt.summary()) -# Stream large pcap files -for pkt in PcapReader("large_capture.pcap"): +# Stream large captures (works with both formats) +for pkt in PcapReader("large_capture.pcapng"): print(pkt.summary()) ``` @@ -257,6 +265,36 @@ pkt.parse() print(pkt.has_layer(LayerKind.L2tp)) ``` +### IoT Protocols + +```python +from stackforge import MQTT, MQTTSN, Modbus, ZWave + +# MQTT (auto-detected on TCP port 1883) +pkt = Ether() / IP() / TCP(dport=1883) / MQTT(msg_type=1) # CONNECT + +# MQTT-SN (auto-detected on UDP port 1883) +pkt = Ether() / IP() / UDP(dport=1883) / MQTTSN(msg_type=0x04) # PUBLISH + +# Modbus TCP (auto-detected on TCP port 502) +pkt = Ether() / IP() / TCP(dport=502) / Modbus(func_code=3, data=b"\x00\x01\x00\x0a") + +# Z-Wave (wireless, not auto-detected over TCP/UDP) +pkt = ZWave(home_id=0x12345678, src=1, dst=2, cmd_class=0x25, cmd=0x01) +``` + +### Email & File Transfer Protocols + +```python +from stackforge import FTP, TFTP, SMTP, POP3, IMAP + +# FTP (TCP port 21), SMTP (TCP ports 25/587/465), POP3 (TCP port 110), IMAP (TCP port 143) +# All auto-detected during packet parsing + +# TFTP (UDP port 69) +pkt = Ether() / IP() / UDP(dport=69) / TFTP(opcode=1, filename="test.txt", mode="octet") +``` + ### Stateful Flow Extraction Extract bidirectional conversations from PCAP captures with full TCP state machine tracking, stream reassembly, and UDP timeout-based flow grouping. @@ -292,7 +330,7 @@ packets = rdpcap("capture.pcap") conversations = extract_flows_from_packets(packets) ``` -Customize timeouts and buffer limits with `FlowConfig`: +Customize timeouts, buffer limits, and memory budget with `FlowConfig`: ```python config = FlowConfig( @@ -303,6 +341,20 @@ config = FlowConfig( conversations = extract_flows("capture.pcap", config=config) ``` +#### Memory-Budgeted Flow Extraction + +For large captures, set a memory budget so reassembly buffers automatically spill to disk when RAM is tight: + +```python +config = FlowConfig( + memory_budget=256 * 1024 * 1024, # 256 MB RAM budget + spill_dir="/tmp/stackforge-spill", # optional custom spill directory +) +conversations = extract_flows("large_capture.pcapng", config=config) +``` + +Packets stream from disk one at a time (never loaded all at once). When TCP reassembly buffers exceed the budget, the largest buffers are transparently spilled to memory-mapped temp files and read back on demand. Temp files are automatically cleaned up via RAII. + Optional: Track maximum packet sizes during flow extraction: ```python diff --git a/crates/stackforge-core/Cargo.toml b/crates/stackforge-core/Cargo.toml index 4d02d7d..316ec12 100644 --- a/crates/stackforge-core/Cargo.toml +++ b/crates/stackforge-core/Cargo.toml @@ -15,6 +15,8 @@ default-net = "0.22.0" rand = { version = "0.9.2", optional = true } pcap-file = "2" dashmap = { workspace = true } +memmap2 = { workspace = true } +tempfile = { workspace = true } # TLS crypto dependencies hmac = { version = "0.12", optional = true } diff --git a/crates/stackforge-core/src/flow/config.rs b/crates/stackforge-core/src/flow/config.rs index 1255d99..2ab5898 100644 --- a/crates/stackforge-core/src/flow/config.rs +++ b/crates/stackforge-core/src/flow/config.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use std::time::Duration; /// Configuration for the flow extraction engine. @@ -24,6 +25,11 @@ pub struct FlowConfig { pub track_max_packet_len: bool, /// Track maximum flow length per direction (default: false). pub track_max_flow_len: bool, + /// Total RAM budget for flow extraction (None = unlimited). + /// When set, reassembly buffers will be spilled to disk when exceeded. + pub memory_budget: Option, + /// Directory for spill files (None = system temp dir). + pub spill_dir: Option, } impl Default for FlowConfig { @@ -38,6 +44,8 @@ impl Default for FlowConfig { eviction_interval: Duration::from_secs(30), track_max_packet_len: false, track_max_flow_len: false, + memory_budget: None, + spill_dir: None, } } } diff --git a/crates/stackforge-core/src/flow/error.rs b/crates/stackforge-core/src/flow/error.rs index 3c8d483..68d8cc2 100644 --- a/crates/stackforge-core/src/flow/error.rs +++ b/crates/stackforge-core/src/flow/error.rs @@ -20,6 +20,9 @@ pub enum FlowError { #[error("too many discontinuous fragments ({count}, limit {limit})")] TooManyFragments { count: usize, limit: usize }, + #[error("disk spill I/O error: {0}")] + SpillError(String), + #[error(transparent)] PacketError(#[from] PacketError), } diff --git a/crates/stackforge-core/src/flow/mod.rs b/crates/stackforge-core/src/flow/mod.rs index 0690ba3..c48886b 100644 --- a/crates/stackforge-core/src/flow/mod.rs +++ b/crates/stackforge-core/src/flow/mod.rs @@ -29,6 +29,7 @@ pub mod config; pub mod error; pub mod icmp_state; pub mod key; +pub mod spill; pub mod state; pub mod table; pub mod tcp_reassembly; @@ -51,9 +52,11 @@ pub use tcp_state::{TcpConnectionState, TcpConversationState, TcpEndpointState}; pub use udp_state::UdpFlowState; use std::collections::HashMap; +use std::path::Path; +use crate::error::PacketError; use crate::layer::LayerKind; -use crate::pcap::CapturedPacket; +use crate::pcap::{CaptureIterator, CapturedPacket}; /// Extract bidirectional conversations from a list of captured packets. /// @@ -82,6 +85,45 @@ pub fn extract_flows_with_config( Ok(table.into_conversations()) } +/// Extract flows from a streaming packet source (iterator). +/// +/// Does not require all packets in memory simultaneously — each packet is +/// processed and then dropped. Only conversation state (metadata + reassembly +/// buffers) is retained. +/// +/// If `config.memory_budget` is set, reassembly buffers will be spilled to +/// disk when the budget is exceeded. +pub fn extract_flows_streaming( + packets: I, + config: FlowConfig, +) -> Result, FlowError> +where + I: Iterator>, +{ + let table = ConversationTable::new(config); + + for (index, result) in packets.enumerate() { + let captured = result.map_err(FlowError::PacketError)?; + let timestamp = captured.metadata.timestamp; + table.ingest_packet(&captured.packet, timestamp, index)?; + // `captured` is dropped here — packet memory freed immediately + } + + Ok(table.into_conversations()) +} + +/// Extract flows directly from a capture file (PCAP or PcapNG). +/// +/// Streams packets from disk — never loads the entire file into memory. +/// The file format is auto-detected from magic bytes. +pub fn extract_flows_from_file( + path: impl AsRef, + config: FlowConfig, +) -> Result, FlowError> { + let iter = CaptureIterator::open(path).map_err(FlowError::PacketError)?; + extract_flows_streaming(iter, config) +} + /// Extract Z-Wave conversations from a list of captured packets. /// /// Z-Wave is a wireless protocol not carried over IP, so it needs its own @@ -155,6 +197,7 @@ mod tests { metadata: PcapMetadata { timestamp: Duration::from_secs(timestamp_secs), orig_len: 0, + ..Default::default() }, } } diff --git a/crates/stackforge-core/src/flow/spill.rs b/crates/stackforge-core/src/flow/spill.rs new file mode 100644 index 0000000..66edcd0 --- /dev/null +++ b/crates/stackforge-core/src/flow/spill.rs @@ -0,0 +1,298 @@ +//! Disk spill manager for memory-budgeted flow extraction. +//! +//! Provides `ReassemblyStorage` for data that can be transparently spilled to +//! mmap'd temp files when RAM budget is exceeded, and `MemoryTracker` for +//! global memory accounting. + +use std::io::Write; +use std::path::Path; +use std::sync::atomic::{AtomicUsize, Ordering}; + +use memmap2::Mmap; +use tempfile::NamedTempFile; + +/// A handle to reassembly data that may be in memory or on disk. +/// +/// When in memory, behaves like a `Vec`. When spilled, data lives in a +/// temporary file and is read back via `mmap` on demand. The temp file is +/// auto-deleted when this value is dropped. +#[derive(Debug)] +pub enum ReassemblyStorage { + /// Data held in memory. + InMemory(Vec), + /// Data flushed to a memory-mapped temporary file. + OnDisk { file: NamedTempFile, len: usize }, +} + +impl ReassemblyStorage { + /// Create new empty in-memory storage. + pub fn new() -> Self { + Self::InMemory(Vec::new()) + } + + /// Current byte length of stored data. + pub fn len(&self) -> usize { + match self { + Self::InMemory(v) => v.len(), + Self::OnDisk { len, .. } => *len, + } + } + + /// Whether storage is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Append data. Only valid when `InMemory`. + /// + /// # Panics + /// Panics if storage has been spilled to disk. Callers must ensure data + /// is not appended after a spill (in practice, spilling only happens + /// for completed/idle flows). + pub fn extend_from_slice(&mut self, data: &[u8]) { + match self { + Self::InMemory(v) => v.extend_from_slice(data), + Self::OnDisk { .. } => { + panic!("cannot extend spilled storage; data already on disk"); + }, + } + } + + /// Spill in-memory data to disk, returning the number of bytes freed. + /// + /// If already on disk or empty, returns 0. + pub fn spill_to_disk(&mut self, spill_dir: Option<&Path>) -> std::io::Result { + let old = std::mem::replace(self, Self::InMemory(Vec::new())); + match old { + Self::InMemory(data) => { + if data.is_empty() { + *self = Self::InMemory(data); + return Ok(0); + } + let freed = data.len(); + let mut tmpfile = match spill_dir { + Some(dir) => NamedTempFile::new_in(dir)?, + None => NamedTempFile::new()?, + }; + tmpfile.write_all(&data)?; + tmpfile.flush()?; + *self = Self::OnDisk { + file: tmpfile, + len: freed, + }; + Ok(freed) + }, + already_on_disk @ Self::OnDisk { .. } => { + *self = already_on_disk; + Ok(0) + }, + } + } + + /// Read all data back. Works for both in-memory and on-disk storage. + pub fn read_all(&self) -> std::io::Result> { + match self { + Self::InMemory(v) => Ok(v.clone()), + Self::OnDisk { file, len } => { + if *len == 0 { + return Ok(Vec::new()); + } + // Safety: the file is exclusively ours (NamedTempFile), and we + // only read from it after flushing. The mmap is short-lived. + let mmap = unsafe { Mmap::map(file.as_file())? }; + Ok(mmap[..*len].to_vec()) + }, + } + } + + /// Get a reference to in-memory data. Returns `None` if spilled. + pub fn as_slice(&self) -> Option<&[u8]> { + match self { + Self::InMemory(v) => Some(v), + Self::OnDisk { .. } => None, + } + } + + /// Whether data is currently on disk. + pub fn is_spilled(&self) -> bool { + matches!(self, Self::OnDisk { .. }) + } + + /// Bytes currently held in memory (0 if spilled). + pub fn in_memory_bytes(&self) -> usize { + match self { + Self::InMemory(v) => v.len(), + Self::OnDisk { .. } => 0, + } + } + + /// Drain and return data, resetting to empty. Reads from disk if spilled. + pub fn drain(&mut self) -> std::io::Result> { + let data = self.read_all()?; + *self = Self::InMemory(Vec::new()); + Ok(data) + } +} + +impl Default for ReassemblyStorage { + fn default() -> Self { + Self::new() + } +} + +/// Global memory tracker for the flow extraction engine. +/// +/// Uses atomic operations for thread-safe accounting without locks. +#[derive(Debug)] +pub struct MemoryTracker { + /// Current estimated memory usage in bytes. + current: AtomicUsize, + /// Budget limit (None = unlimited). + budget: Option, +} + +impl MemoryTracker { + /// Create a new tracker with an optional budget. + pub fn new(budget: Option) -> Self { + Self { + current: AtomicUsize::new(0), + budget, + } + } + + /// Record newly allocated bytes. + pub fn add(&self, bytes: usize) { + self.current.fetch_add(bytes, Ordering::Relaxed); + } + + /// Record freed bytes. + pub fn subtract(&self, bytes: usize) { + self.current.fetch_sub(bytes, Ordering::Relaxed); + } + + /// Current estimated memory usage. + pub fn current_usage(&self) -> usize { + self.current.load(Ordering::Relaxed) + } + + /// Whether current usage exceeds the budget. + pub fn is_over_budget(&self) -> bool { + match self.budget { + Some(b) => self.current_usage() > b, + None => false, + } + } + + /// Whether a budget has been set. + pub fn has_budget(&self) -> bool { + self.budget.is_some() + } + + /// The configured budget, if any. + pub fn budget(&self) -> Option { + self.budget + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_reassembly_storage_in_memory() { + let mut s = ReassemblyStorage::new(); + assert!(s.is_empty()); + assert_eq!(s.len(), 0); + assert!(!s.is_spilled()); + + s.extend_from_slice(b"hello"); + assert_eq!(s.len(), 5); + assert_eq!(s.as_slice(), Some(b"hello".as_slice())); + assert_eq!(s.in_memory_bytes(), 5); + + let data = s.read_all().unwrap(); + assert_eq!(data, b"hello"); + } + + #[test] + fn test_reassembly_storage_spill_and_read() { + let mut s = ReassemblyStorage::new(); + s.extend_from_slice(b"test data for spill"); + + let freed = s.spill_to_disk(None).unwrap(); + assert_eq!(freed, 19); + assert!(s.is_spilled()); + assert_eq!(s.len(), 19); + assert_eq!(s.in_memory_bytes(), 0); + assert!(s.as_slice().is_none()); + + let data = s.read_all().unwrap(); + assert_eq!(data, b"test data for spill"); + } + + #[test] + fn test_reassembly_storage_spill_empty() { + let mut s = ReassemblyStorage::new(); + let freed = s.spill_to_disk(None).unwrap(); + assert_eq!(freed, 0); + assert!(!s.is_spilled()); + } + + #[test] + fn test_reassembly_storage_double_spill() { + let mut s = ReassemblyStorage::new(); + s.extend_from_slice(b"data"); + s.spill_to_disk(None).unwrap(); + + // Second spill should be a no-op + let freed = s.spill_to_disk(None).unwrap(); + assert_eq!(freed, 0); + } + + #[test] + fn test_reassembly_storage_drain() { + let mut s = ReassemblyStorage::new(); + s.extend_from_slice(b"drain me"); + let data = s.drain().unwrap(); + assert_eq!(data, b"drain me"); + assert!(s.is_empty()); + } + + #[test] + fn test_reassembly_storage_drain_spilled() { + let mut s = ReassemblyStorage::new(); + s.extend_from_slice(b"spilled drain"); + s.spill_to_disk(None).unwrap(); + let data = s.drain().unwrap(); + assert_eq!(data, b"spilled drain"); + assert!(s.is_empty()); + assert!(!s.is_spilled()); + } + + #[test] + fn test_memory_tracker_no_budget() { + let tracker = MemoryTracker::new(None); + assert!(!tracker.has_budget()); + tracker.add(1_000_000); + assert!(!tracker.is_over_budget()); + } + + #[test] + fn test_memory_tracker_with_budget() { + let tracker = MemoryTracker::new(Some(1000)); + assert!(tracker.has_budget()); + assert_eq!(tracker.budget(), Some(1000)); + + tracker.add(500); + assert_eq!(tracker.current_usage(), 500); + assert!(!tracker.is_over_budget()); + + tracker.add(600); + assert_eq!(tracker.current_usage(), 1100); + assert!(tracker.is_over_budget()); + + tracker.subtract(200); + assert_eq!(tracker.current_usage(), 900); + assert!(!tracker.is_over_budget()); + } +} diff --git a/crates/stackforge-core/src/flow/state.rs b/crates/stackforge-core/src/flow/state.rs index b3c37e6..eaee312 100644 --- a/crates/stackforge-core/src/flow/state.rs +++ b/crates/stackforge-core/src/flow/state.rs @@ -323,9 +323,30 @@ mod tests { fn test_record_packet() { let mut state = ConversationState::new(test_key(), Duration::from_secs(1)); - state.record_packet(FlowDirection::Forward, 100, Duration::from_secs(1), 0); - state.record_packet(FlowDirection::Reverse, 200, Duration::from_secs(2), 1); - state.record_packet(FlowDirection::Forward, 50, Duration::from_secs(3), 2); + state.record_packet( + FlowDirection::Forward, + 100, + Duration::from_secs(1), + 0, + false, + false, + ); + state.record_packet( + FlowDirection::Reverse, + 200, + Duration::from_secs(2), + 1, + false, + false, + ); + state.record_packet( + FlowDirection::Forward, + 50, + Duration::from_secs(3), + 2, + false, + false, + ); assert_eq!(state.total_packets(), 3); assert_eq!(state.total_bytes(), 350); diff --git a/crates/stackforge-core/src/flow/table.rs b/crates/stackforge-core/src/flow/table.rs index 9b76f4f..da239e5 100644 --- a/crates/stackforge-core/src/flow/table.rs +++ b/crates/stackforge-core/src/flow/table.rs @@ -1,3 +1,4 @@ +use std::sync::Arc; use std::time::Duration; use dashmap::DashMap; @@ -7,25 +8,30 @@ use crate::Packet; use super::config::FlowConfig; use super::error::FlowError; use super::key::{CanonicalKey, extract_key}; +use super::spill::MemoryTracker; use super::state::{ConversationState, ProtocolState}; /// Thread-safe conversation tracking table backed by `DashMap`. /// /// Supports concurrent packet ingestion from multiple threads while /// maintaining per-conversation state including TCP state machines -/// and stream reassembly. +/// and stream reassembly. Optionally tracks memory usage and spills +/// reassembly buffers to disk when a budget is exceeded. pub struct ConversationTable { conversations: DashMap, config: FlowConfig, + memory_tracker: Arc, } impl ConversationTable { /// Create a new table with the given configuration. #[must_use] pub fn new(config: FlowConfig) -> Self { + let memory_tracker = Arc::new(MemoryTracker::new(config.memory_budget)); Self { conversations: DashMap::new(), config, + memory_tracker, } } @@ -119,9 +125,55 @@ impl ConversationTable { // Update conversation status from protocol state conv.update_status(); + // Track memory for TCP reassembly buffers + if self.memory_tracker.has_budget() { + if matches!(conv.protocol_state, ProtocolState::Tcp(_)) { + // Track bytes that TCP payload may have added + let tcp_payload_len = packet.tcp().map_or(0, |tcp| { + let data_offset = tcp.data_offset(buf).unwrap_or(5) as usize * 4; + let payload_start = tcp.index.start + data_offset; + buf.len().saturating_sub(payload_start) + }); + if tcp_payload_len > 0 { + self.memory_tracker.add(tcp_payload_len); + } + } + } + + // Drop the entry lock before spilling (which needs iter_mut) + drop(entry); + + // Spill if over budget + if self.memory_tracker.is_over_budget() { + self.maybe_spill(); + } + Ok(()) } + /// Spill the largest reassembly buffers to disk until under budget. + fn maybe_spill(&self) { + for mut entry in self.conversations.iter_mut() { + if !self.memory_tracker.is_over_budget() { + break; + } + if let ProtocolState::Tcp(ref mut tcp_state) = entry.value_mut().protocol_state { + let freed_fwd = tcp_state + .reassembler_fwd + .spill(self.config.spill_dir.as_deref()) + .unwrap_or(0); + let freed_rev = tcp_state + .reassembler_rev + .spill(self.config.spill_dir.as_deref()) + .unwrap_or(0); + let total_freed = freed_fwd + freed_rev; + if total_freed > 0 { + self.memory_tracker.subtract(total_freed); + } + } + } + } + /// Get a read reference to a specific conversation. #[must_use] pub fn get_conversation( diff --git a/crates/stackforge-core/src/flow/tcp_reassembly.rs b/crates/stackforge-core/src/flow/tcp_reassembly.rs index a71af00..752634c 100644 --- a/crates/stackforge-core/src/flow/tcp_reassembly.rs +++ b/crates/stackforge-core/src/flow/tcp_reassembly.rs @@ -1,7 +1,9 @@ use std::collections::BTreeMap; +use std::path::Path; use super::config::FlowConfig; use super::error::FlowError; +use super::spill::ReassemblyStorage; /// Result of processing a TCP segment through the reassembly engine. #[derive(Debug, Clone, PartialEq, Eq)] @@ -29,8 +31,8 @@ pub struct TcpReassembler { segments: BTreeMap>, /// Next expected sequence number (advanced as data arrives in-order). next_expected_seq: u32, - /// Contiguous reassembled byte stream. - reassembled: Vec, + /// Contiguous reassembled byte stream (may be in memory or on disk). + reassembled: ReassemblyStorage, /// Total bytes currently buffered in out-of-order cache. total_buffered: usize, /// Number of distinct out-of-order fragments. @@ -46,7 +48,7 @@ impl TcpReassembler { Self { segments: BTreeMap::new(), next_expected_seq: 0, - reassembled: Vec::new(), + reassembled: ReassemblyStorage::new(), total_buffered: 0, fragment_count: 0, initialized: false, @@ -65,15 +67,25 @@ impl TcpReassembler { self.initialized } - /// Get the contiguous reassembled data accumulated so far. + /// Get the contiguous reassembled data if it's in memory. + /// + /// Returns `None` if the data has been spilled to disk. + /// Use [`read_reassembled`] for guaranteed access regardless of storage location. #[must_use] pub fn reassembled_data(&self) -> &[u8] { - &self.reassembled + self.reassembled.as_slice().unwrap_or(&[]) + } + + /// Read the reassembled data regardless of storage location. + /// + /// Works for both in-memory and spilled-to-disk data. + pub fn read_reassembled(&self) -> std::io::Result> { + self.reassembled.read_all() } /// Drain and return the reassembled data, resetting the buffer. - pub fn drain_reassembled(&mut self) -> Vec { - std::mem::take(&mut self.reassembled) + pub fn drain_reassembled(&mut self) -> std::io::Result> { + self.reassembled.drain() } /// Total bytes in the out-of-order buffer. @@ -88,6 +100,25 @@ impl TcpReassembler { self.fragment_count } + /// Total bytes currently held in memory (reassembled + OOO segments). + #[must_use] + pub fn in_memory_bytes(&self) -> usize { + self.reassembled.in_memory_bytes() + self.total_buffered + } + + /// Spill reassembled data to a temporary file on disk. + /// + /// Returns the number of bytes freed from memory. + pub fn spill(&mut self, spill_dir: Option<&Path>) -> std::io::Result { + self.reassembled.spill_to_disk(spill_dir) + } + + /// Whether the reassembled data has been spilled to disk. + #[must_use] + pub fn is_spilled(&self) -> bool { + self.reassembled.is_spilled() + } + /// Process an incoming TCP segment. /// /// Handles in-order, out-of-order, overlapping, and duplicate segments @@ -158,9 +189,7 @@ impl TcpReassembler { /// Drain contiguous segments from the `BTreeMap` that can now be appended. fn try_drain_buffered(&mut self) { - // Collect keys to drain (can't mutate while iterating) loop { - // Find the first segment that starts at or before next_expected_seq let key = { let entry = self.segments.range(..=self.next_expected_seq).next_back(); match entry { @@ -169,29 +198,24 @@ impl TcpReassembler { } }; - // Remove and process this segment if let Some(data) = self.segments.remove(&key) { let seg_end = key.wrapping_add(data.len() as u32); self.total_buffered -= data.len(); self.fragment_count -= 1; - // Check if this segment extends beyond next_expected_seq if self.seq_after(seg_end, self.next_expected_seq) { if self.seq_before(key, self.next_expected_seq) { - // Partial overlap with already-received data let overlap = self.next_expected_seq.wrapping_sub(key) as usize; if overlap < data.len() { self.reassembled.extend_from_slice(&data[overlap..]); self.next_expected_seq = seg_end; } } else { - // key == next_expected_seq (perfectly aligned) self.reassembled.extend_from_slice(&data); self.next_expected_seq = seg_end; } } - // else: segment is fully behind, skip it (duplicate) } } } @@ -248,15 +272,12 @@ mod tests { let mut r = TcpReassembler::new(); r.initialize(1000); - // Segment 2 arrives first (out of order) let action = r.process_segment(1005, b" world", &config).unwrap(); assert_eq!(action, ReassemblyAction::Buffered); assert_eq!(r.fragment_count(), 1); - // Segment 1 arrives, fills the gap let action = r.process_segment(1000, b"hello", &config).unwrap(); assert_eq!(action, ReassemblyAction::DataReady(5)); - // The buffered segment should have been drained assert_eq!(r.reassembled_data(), b"hello world"); assert_eq!(r.fragment_count(), 0); } @@ -280,7 +301,6 @@ mod tests { r.initialize(1000); r.process_segment(1000, b"hello", &config).unwrap(); - // Overlapping: starts at 1003, overlaps 2 bytes, adds 3 new let action = r.process_segment(1003, b"lo wo", &config).unwrap(); assert_eq!(action, ReassemblyAction::OverlapTrimmed(3)); assert_eq!(r.reassembled_data(), b"hello wo"); @@ -329,12 +349,10 @@ mod tests { let mut r = TcpReassembler::new(); r.initialize(100); - // Send segments 3, 2, then 1 r.process_segment(110, b"ccc", &config).unwrap(); r.process_segment(105, b"bbbbb", &config).unwrap(); assert_eq!(r.fragment_count(), 2); - // Fill the gap with segment 1 r.process_segment(100, b"aaaaa", &config).unwrap(); assert_eq!(r.reassembled_data(), b"aaaaabbbbbccc"); assert_eq!(r.fragment_count(), 0); @@ -345,7 +363,6 @@ mod tests { let config = default_config(); let mut r = TcpReassembler::new(); - // Should auto-initialize on first segment let action = r.process_segment(5000, b"data", &config).unwrap(); assert_eq!(action, ReassemblyAction::DataReady(4)); assert!(r.is_initialized()); @@ -359,8 +376,26 @@ mod tests { r.initialize(0); r.process_segment(0, b"hello", &config).unwrap(); - let data = r.drain_reassembled(); + let data = r.drain_reassembled().unwrap(); assert_eq!(data, b"hello"); assert!(r.reassembled_data().is_empty()); } + + #[test] + fn test_spill_and_read() { + let config = default_config(); + let mut r = TcpReassembler::new(); + r.initialize(0); + + r.process_segment(0, b"spill test data", &config).unwrap(); + assert_eq!(r.in_memory_bytes(), 15); + + let freed = r.spill(None).unwrap(); + assert_eq!(freed, 15); + assert!(r.is_spilled()); + assert_eq!(r.in_memory_bytes(), 0); + + let data = r.read_reassembled().unwrap(); + assert_eq!(data, b"spill test data"); + } } diff --git a/crates/stackforge-core/src/layer/mqttsn/builder.rs b/crates/stackforge-core/src/layer/mqttsn/builder.rs index 388aceb..93f37ff 100644 --- a/crates/stackforge-core/src/layer/mqttsn/builder.rs +++ b/crates/stackforge-core/src/layer/mqttsn/builder.rs @@ -687,6 +687,7 @@ impl MqttSnBuilder { #[cfg(test)] mod tests { use super::*; + use crate::layer::mqttsn::{MqttSnLayer, RC_REJ_CONGESTION, RC_REJ_INVALID_TID, TID_NORMAL}; use crate::layer::{LayerIndex, LayerKind}; fn make_layer(buf: &[u8]) -> MqttSnLayer { diff --git a/crates/stackforge-core/src/lib.rs b/crates/stackforge-core/src/lib.rs index bb31013..1e2c257 100644 --- a/crates/stackforge-core/src/lib.rs +++ b/crates/stackforge-core/src/lib.rs @@ -160,14 +160,16 @@ pub use layer::{ }; pub use packet::Packet; pub use pcap::{ - CapturedPacket, LinkType, PcapIterator, PcapMetadata, rdpcap, wrpcap, wrpcap_packets, + CaptureFormat, CaptureIterator, CapturedPacket, LinkType, PcapIterator, PcapMetadata, + PcapNgIterator, PcapNgStreamWriter, rdpcap, wrpcap, wrpcap_packets, wrpcapng, wrpcapng_packets, }; // Flow extraction re-exports pub use flow::{ CanonicalKey, ConversationState, ConversationStatus, ConversationTable, DirectionStats, FlowConfig, FlowDirection, FlowError, ProtocolState, TransportProtocol, ZWaveFlowState, - ZWaveKey, extract_flows, extract_flows_with_config, extract_zwave_flows, + ZWaveKey, extract_flows, extract_flows_from_file, extract_flows_streaming, + extract_flows_with_config, extract_zwave_flows, }; // Utils re-exports diff --git a/crates/stackforge-core/src/pcap/mod.rs b/crates/stackforge-core/src/pcap/mod.rs index 8aaa3d5..cc6e1c6 100644 --- a/crates/stackforge-core/src/pcap/mod.rs +++ b/crates/stackforge-core/src/pcap/mod.rs @@ -1,7 +1,8 @@ -//! PCAP file I/O for reading and writing packet captures. +//! PCAP and PcapNG file I/O for reading and writing packet captures. //! -//! Provides `rdpcap` for reading all packets from a file, -//! `PcapIterator` for streaming large captures, and `wrpcap` for writing. +//! Provides `rdpcap` for reading all packets from a file (auto-detects format), +//! `PcapIterator` / `PcapNgIterator` / `CaptureIterator` for streaming, +//! and `wrpcap` / `wrpcapng` for writing. pub mod reader; pub mod writer; @@ -10,13 +11,37 @@ use std::time::Duration; use crate::Packet; -/// Metadata from a PCAP packet record. -#[derive(Debug, Clone, Default)] +/// Capture file format. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CaptureFormat { + /// Classic PCAP format (`.pcap`). + Pcap, + /// PcapNG format (`.pcapng`). + PcapNg, +} + +/// Metadata from a PCAP/PcapNG packet record. +#[derive(Debug, Clone)] pub struct PcapMetadata { /// Timestamp of when the packet was captured. pub timestamp: Duration, /// Original length of the packet on the wire (may be larger than captured data). pub orig_len: u32, + /// PcapNG interface ID (None for classic PCAP). + pub interface_id: Option, + /// PcapNG per-packet comment (None for classic PCAP). + pub comment: Option, +} + +impl Default for PcapMetadata { + fn default() -> Self { + Self { + timestamp: Duration::ZERO, + orig_len: 0, + interface_id: None, + comment: None, + } + } } /// A captured packet with associated PCAP metadata. @@ -38,5 +63,5 @@ impl LinkType { pub const LINUX_SLL: Self = Self(113); } -pub use reader::{PcapIterator, rdpcap}; -pub use writer::{wrpcap, wrpcap_packets}; +pub use reader::{CaptureIterator, PcapIterator, PcapNgIterator, rdpcap}; +pub use writer::{PcapNgStreamWriter, wrpcap, wrpcap_packets, wrpcapng, wrpcapng_packets}; diff --git a/crates/stackforge-core/src/pcap/reader.rs b/crates/stackforge-core/src/pcap/reader.rs index 2515628..8a2e0b6 100644 --- a/crates/stackforge-core/src/pcap/reader.rs +++ b/crates/stackforge-core/src/pcap/reader.rs @@ -1,36 +1,63 @@ -//! PCAP file reader with streaming support. +//! PCAP and PcapNG file reader with streaming support and auto-detection. use std::fs::File; -use std::io::{BufReader, Read}; -use std::path::Path; +use std::io::{BufReader, Cursor, Read, Seek, SeekFrom}; use bytes::Bytes; use pcap_file::pcap::PcapReader as PcapFileReader; +use pcap_file::pcapng::Block; +use pcap_file::pcapng::PcapNgReader as PcapNgFileReader; use crate::error::{PacketError, Result}; use crate::packet::Packet; -use super::{CapturedPacket, LinkType, PcapMetadata}; +use super::{CaptureFormat, CapturedPacket, LinkType, PcapMetadata}; + +/// Detect capture format from the first 4 bytes (magic number). +fn detect_format(magic: &[u8; 4]) -> Result { + let le = u32::from_le_bytes(*magic); + let be = u32::from_be_bytes(*magic); + + // PcapNG Section Header Block type is 0x0A0D0D0A in both endiannesses + if le == 0x0A0D_0D0A || be == 0x0A0D_0D0A { + return Ok(CaptureFormat::PcapNg); + } + + // PCAP magic: microsecond or nanosecond, either endian + if le == 0xA1B2_C3D4 || be == 0xA1B2_C3D4 || le == 0xA1B2_3C4D || be == 0xA1B2_3C4D { + return Ok(CaptureFormat::Pcap); + } + + Err(PacketError::Io( + "unknown capture file format (not PCAP or PcapNG)".into(), + )) +} -/// Read all packets from a PCAP file into memory. +/// Read all packets from a PCAP or PcapNG file into memory. /// -/// This is the simple Scapy-like API. For large files, use [`PcapIterator`] instead. +/// Auto-detects the file format from magic bytes. +/// This is the simple Scapy-like API. For large files, use [`CaptureIterator`] instead. pub fn rdpcap(path: impl AsRef) -> Result> { - let iter = PcapIterator::open(path)?; + let iter = CaptureIterator::open(&path)?; iter.collect() } -/// Streaming iterator over packets in a PCAP file. +use std::path::Path; + +// --------------------------------------------------------------------------- +// Classic PCAP iterator +// --------------------------------------------------------------------------- + +/// Streaming iterator over packets in a classic PCAP file. /// /// Reads packets one at a time, suitable for gigabyte-sized captures. -/// Implements `Iterator>`. pub struct PcapIterator { inner: PcapFileReader, link_type: LinkType, } impl PcapIterator> { - /// Open a PCAP file for streaming iteration. + /// Open a classic PCAP file for streaming iteration. pub fn open(path: impl AsRef) -> Result { let file = File::open(path.as_ref()).map_err(|e| { PacketError::Io(format!("failed to open {}: {}", path.as_ref().display(), e)) @@ -67,13 +94,14 @@ impl Iterator for PcapIterator { let ts = pcap_pkt.timestamp; let data = Bytes::copy_from_slice(&pcap_pkt.data); let mut pkt = Packet::from_bytes(data); - // Auto-parse; ignore parse errors (store as unparsed) let _ = pkt.parse(); Some(Ok(CapturedPacket { packet: pkt, metadata: PcapMetadata { timestamp: ts, orig_len: pcap_pkt.orig_len, + interface_id: None, + comment: None, }, })) }, @@ -83,10 +111,193 @@ impl Iterator for PcapIterator { } } +// --------------------------------------------------------------------------- +// PcapNG iterator +// --------------------------------------------------------------------------- + +/// Streaming iterator over packets in a PcapNG file. +/// +/// Reads packets one at a time, skipping non-packet blocks (SHB, IDB, etc.). +pub struct PcapNgIterator { + inner: PcapNgFileReader, +} + +impl PcapNgIterator> { + /// Open a PcapNG file for streaming iteration. + pub fn open(path: impl AsRef) -> Result { + let file = File::open(path.as_ref()).map_err(|e| { + PacketError::Io(format!("failed to open {}: {}", path.as_ref().display(), e)) + })?; + let reader = BufReader::new(file); + Self::from_reader(reader) + } +} + +impl PcapNgIterator { + /// Create a `PcapNgIterator` from any reader. + pub fn from_reader(reader: R) -> Result { + let ng_reader = PcapNgFileReader::new(reader) + .map_err(|e| PacketError::Io(format!("invalid PcapNG: {e}")))?; + Ok(Self { inner: ng_reader }) + } + + /// Returns the link type of the first interface (most common case). + pub fn link_type(&self) -> LinkType { + self.inner + .interfaces() + .first() + .map(|idb| LinkType(u32::from(idb.linktype))) + .unwrap_or(LinkType::ETHERNET) + } +} + +impl Iterator for PcapNgIterator { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + match self.inner.next_block() { + Some(Ok(block)) => { + match block { + Block::EnhancedPacket(epb) => { + let data = Bytes::copy_from_slice(&epb.data); + let mut pkt = Packet::from_bytes(data); + let _ = pkt.parse(); + return Some(Ok(CapturedPacket { + packet: pkt, + metadata: PcapMetadata { + timestamp: epb.timestamp, + orig_len: epb.original_len, + interface_id: Some(epb.interface_id), + comment: None, + }, + })); + }, + Block::SimplePacket(spb) => { + let data = Bytes::copy_from_slice(&spb.data); + let mut pkt = Packet::from_bytes(data); + let _ = pkt.parse(); + return Some(Ok(CapturedPacket { + packet: pkt, + metadata: PcapMetadata { + timestamp: std::time::Duration::ZERO, + orig_len: spb.original_len, + interface_id: Some(0), + comment: None, + }, + })); + }, + // Skip non-packet blocks (SHB, IDB, NRB, ISB, etc.) + _ => continue, + } + }, + Some(Err(e)) => { + return Some(Err(PacketError::Io(format!("PcapNG read error: {e}")))); + }, + None => return None, + } + } + } +} + +// --------------------------------------------------------------------------- +// Unified auto-detecting iterator +// --------------------------------------------------------------------------- + +/// Auto-detecting iterator over packets from either PCAP or PcapNG files. +/// +/// Detects the format from magic bytes and delegates to the appropriate reader. +pub enum CaptureIterator { + /// Classic PCAP format. + Pcap(PcapIterator), + /// PcapNG format. + PcapNg(PcapNgIterator), +} + +impl CaptureIterator> { + /// Open any capture file, auto-detecting format from magic bytes. + pub fn open(path: impl AsRef) -> Result { + let mut file = File::open(path.as_ref()).map_err(|e| { + PacketError::Io(format!("failed to open {}: {}", path.as_ref().display(), e)) + })?; + + let mut magic = [0u8; 4]; + file.read_exact(&mut magic).map_err(|e| { + PacketError::Io(format!( + "failed to read magic bytes from {}: {e}", + path.as_ref().display() + )) + })?; + + let format = detect_format(&magic)?; + + // Seek back to start so the reader sees the full header + file.seek(SeekFrom::Start(0)) + .map_err(|e| PacketError::Io(format!("failed to seek: {e}")))?; + + let reader = BufReader::new(file); + match format { + CaptureFormat::Pcap => Ok(Self::Pcap(PcapIterator::from_reader(reader)?)), + CaptureFormat::PcapNg => Ok(Self::PcapNg(PcapNgIterator::from_reader(reader)?)), + } + } +} + +impl CaptureIterator { + /// Create from a reader by reading 4 magic bytes, then chaining them back. + /// + /// For readers that don't support `Seek`, this reads the magic bytes and + /// chains them back using `Cursor::chain`. + pub fn from_reader( + mut reader: R, + ) -> Result, R>>> { + let mut magic = [0u8; 4]; + reader + .read_exact(&mut magic) + .map_err(|e| PacketError::Io(format!("failed to read magic bytes: {e}")))?; + + let format = detect_format(&magic)?; + let chain = Cursor::new(magic).chain(reader); + + match format { + CaptureFormat::Pcap => Ok(CaptureIterator::Pcap(PcapIterator::from_reader(chain)?)), + CaptureFormat::PcapNg => { + Ok(CaptureIterator::PcapNg(PcapNgIterator::from_reader(chain)?)) + }, + } + } + + /// Returns the link-layer type of the capture. + pub fn link_type(&self) -> LinkType { + match self { + Self::Pcap(p) => p.link_type(), + Self::PcapNg(p) => p.link_type(), + } + } + + /// Returns the detected capture format. + pub fn format(&self) -> CaptureFormat { + match self { + Self::Pcap(_) => CaptureFormat::Pcap, + Self::PcapNg(_) => CaptureFormat::PcapNg, + } + } +} + +impl Iterator for CaptureIterator { + type Item = Result; + + fn next(&mut self) -> Option { + match self { + Self::Pcap(iter) => iter.next(), + Self::PcapNg(iter) => iter.next(), + } + } +} + #[cfg(test)] mod tests { use super::*; - use std::io::Cursor; use std::time::Duration; use pcap_file::pcap::{PcapHeader, PcapPacket, PcapWriter as PcapFileWriter}; @@ -113,6 +324,57 @@ mod tests { buf } + fn create_test_pcapng(packets: &[(Duration, &[u8])]) -> Vec { + use pcap_file::pcapng::PcapNgWriter; + use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock; + use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock; + use std::borrow::Cow; + + let mut buf = Vec::new(); + let mut writer = PcapNgWriter::new(Cursor::new(&mut buf)).unwrap(); + + // Write interface description block + let idb = InterfaceDescriptionBlock { + linktype: pcap_file::DataLink::ETHERNET, + snaplen: 0xFFFF, + options: vec![], + }; + writer.write_pcapng_block(idb).unwrap(); + + for (ts, data) in packets { + let epb = EnhancedPacketBlock { + interface_id: 0, + timestamp: *ts, + original_len: data.len() as u32, + data: Cow::Borrowed(data), + options: vec![], + }; + writer.write_pcapng_block(epb).unwrap(); + } + drop(writer); + buf + } + + #[test] + fn test_detect_format_pcap() { + // Little-endian PCAP magic + let magic = [0xD4, 0xC3, 0xB2, 0xA1]; + assert_eq!(detect_format(&magic).unwrap(), CaptureFormat::Pcap); + } + + #[test] + fn test_detect_format_pcapng() { + // PcapNG SHB magic + let magic = [0x0A, 0x0D, 0x0D, 0x0A]; + assert_eq!(detect_format(&magic).unwrap(), CaptureFormat::PcapNg); + } + + #[test] + fn test_detect_format_unknown() { + let magic = [0x00, 0x00, 0x00, 0x00]; + assert!(detect_format(&magic).is_err()); + } + #[test] fn test_pcap_iterator_from_reader() { let eth = sample_ethernet_packet(); @@ -125,6 +387,8 @@ mod tests { assert_eq!(packets.len(), 2); assert_eq!(packets[0].metadata.timestamp, Duration::from_secs(1)); assert_eq!(packets[1].metadata.timestamp, Duration::from_secs(2)); + // Classic PCAP should have no interface_id + assert!(packets[0].metadata.interface_id.is_none()); } #[test] @@ -162,11 +426,71 @@ mod tests { (Duration::from_secs(3), ð), ]); let mut iter = PcapIterator::from_reader(Cursor::new(pcap_data)).unwrap(); - // Only read one packet let first = iter.next().unwrap().unwrap(); assert_eq!(first.metadata.timestamp, Duration::from_secs(1)); - // Iterator still has more let second = iter.next().unwrap().unwrap(); assert_eq!(second.metadata.timestamp, Duration::from_secs(2)); } + + #[test] + fn test_pcapng_iterator_from_reader() { + let eth = sample_ethernet_packet(); + let pcapng_data = create_test_pcapng(&[ + (Duration::from_secs(10), ð), + (Duration::from_secs(20), ð), + ]); + let iter = PcapNgIterator::from_reader(Cursor::new(pcapng_data)).unwrap(); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert_eq!(packets.len(), 2); + assert_eq!(packets[0].metadata.timestamp, Duration::from_secs(10)); + assert_eq!(packets[1].metadata.timestamp, Duration::from_secs(20)); + assert_eq!(packets[0].metadata.interface_id, Some(0)); + } + + #[test] + fn test_pcapng_iterator_empty() { + let pcapng_data = create_test_pcapng(&[]); + let iter = PcapNgIterator::from_reader(Cursor::new(pcapng_data)).unwrap(); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert!(packets.is_empty()); + } + + #[test] + fn test_capture_iterator_auto_detect_pcap() { + let eth = sample_ethernet_packet(); + let pcap_data = create_test_pcap(&[(Duration::from_secs(1), ð)]); + let iter = CaptureIterator::from_reader(Cursor::new(pcap_data)).unwrap(); + assert_eq!(iter.format(), CaptureFormat::Pcap); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert_eq!(packets.len(), 1); + } + + #[test] + fn test_capture_iterator_auto_detect_pcapng() { + let eth = sample_ethernet_packet(); + let pcapng_data = create_test_pcapng(&[(Duration::from_secs(5), ð)]); + let iter = CaptureIterator::from_reader(Cursor::new(pcapng_data)).unwrap(); + assert_eq!(iter.format(), CaptureFormat::PcapNg); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert_eq!(packets.len(), 1); + assert_eq!(packets[0].metadata.timestamp, Duration::from_secs(5)); + } + + #[test] + fn test_rdpcap_pcapng_roundtrip() { + let eth = sample_ethernet_packet(); + let pcapng_data = create_test_pcapng(&[ + (Duration::from_secs(1), ð), + (Duration::from_secs(2), ð), + (Duration::from_secs(3), ð), + ]); + // Write to temp file and read back with rdpcap + let tmpdir = tempfile::tempdir().unwrap(); + let path = tmpdir.path().join("test.pcapng"); + std::fs::write(&path, &pcapng_data).unwrap(); + let packets = rdpcap(&path).unwrap(); + assert_eq!(packets.len(), 3); + assert_eq!(packets[0].metadata.timestamp, Duration::from_secs(1)); + assert_eq!(packets[2].metadata.timestamp, Duration::from_secs(3)); + } } diff --git a/crates/stackforge-core/src/pcap/writer.rs b/crates/stackforge-core/src/pcap/writer.rs index ea38435..f9ea61e 100644 --- a/crates/stackforge-core/src/pcap/writer.rs +++ b/crates/stackforge-core/src/pcap/writer.rs @@ -1,17 +1,25 @@ -//! PCAP file writer. +//! PCAP and PcapNG file writer. +use std::borrow::Cow; use std::fs::File; use std::io::{BufWriter, Write}; use std::path::Path; use std::time::Duration; use pcap_file::pcap::{PcapHeader, PcapPacket, PcapWriter as PcapFileWriter}; +use pcap_file::pcapng::PcapNgWriter as PcapNgFileWriter; +use pcap_file::pcapng::blocks::enhanced_packet::EnhancedPacketBlock; +use pcap_file::pcapng::blocks::interface_description::InterfaceDescriptionBlock; use crate::error::{PacketError, Result}; use crate::packet::Packet; use super::{CapturedPacket, PcapMetadata}; +// --------------------------------------------------------------------------- +// Classic PCAP writer +// --------------------------------------------------------------------------- + /// Write captured packets to a PCAP file. /// /// Preserves timestamps and original length from [`CapturedPacket`] metadata. @@ -54,6 +62,7 @@ pub fn wrpcap_packets(path: impl AsRef, packets: &[Packet]) -> Result<()> metadata: PcapMetadata { timestamp: Duration::ZERO, orig_len: pkt.len() as u32, + ..Default::default() }, }) .collect(); @@ -114,10 +123,138 @@ impl PcapStreamWriter { } } +// --------------------------------------------------------------------------- +// PcapNG writer +// --------------------------------------------------------------------------- + +/// Write captured packets to a PcapNG file. +/// +/// Writes SHB + IDB (Ethernet) header, then each packet as an Enhanced Packet Block. +pub fn wrpcapng(path: impl AsRef, packets: &[CapturedPacket]) -> Result<()> { + let file = File::create(path.as_ref()).map_err(|e| { + PacketError::Io(format!( + "failed to create {}: {}", + path.as_ref().display(), + e + )) + })?; + let writer = BufWriter::new(file); + let mut ng_writer = PcapNgStreamWriter::from_writer(writer)?; + + for cap in packets { + ng_writer.write(cap)?; + } + + Ok(()) +} + +/// Write plain packets to a PcapNG file (convenience function). +/// +/// Timestamps are set to zero, `orig_len` matches each packet's data length. +pub fn wrpcapng_packets(path: impl AsRef, packets: &[Packet]) -> Result<()> { + let captured: Vec = packets + .iter() + .map(|pkt| CapturedPacket { + packet: pkt.clone(), + metadata: PcapMetadata { + timestamp: Duration::ZERO, + orig_len: pkt.len() as u32, + ..Default::default() + }, + }) + .collect(); + wrpcapng(path, &captured) +} + +/// PcapNG writer for streaming writes. +/// +/// Writes packets one at a time. Auto-writes SHB + IDB on first packet. +pub struct PcapNgStreamWriter { + inner: PcapNgFileWriter, + interface_written: bool, +} + +impl PcapNgStreamWriter> { + /// Create a new PcapNG file for writing. + pub fn create(path: impl AsRef) -> Result { + let file = File::create(path.as_ref()).map_err(|e| { + PacketError::Io(format!( + "failed to create {}: {}", + path.as_ref().display(), + e + )) + })?; + let writer = BufWriter::new(file); + Self::from_writer(writer) + } +} + +impl PcapNgStreamWriter { + /// Create a `PcapNgStreamWriter` from any writer. + /// + /// The SHB is written immediately. The IDB is written on the first packet. + pub fn from_writer(writer: W) -> Result { + let ng_writer = PcapNgFileWriter::new(writer) + .map_err(|e| PacketError::Io(format!("PcapNG write error: {e}")))?; + Ok(Self { + inner: ng_writer, + interface_written: false, + }) + } + + /// Ensure at least one Interface Description Block has been written. + fn ensure_interface(&mut self) -> Result<()> { + if !self.interface_written { + let idb = InterfaceDescriptionBlock { + linktype: pcap_file::DataLink::ETHERNET, + snaplen: 0xFFFF, + options: vec![], + }; + self.inner + .write_pcapng_block(idb) + .map_err(|e| PacketError::Io(format!("PcapNG write error: {e}")))?; + self.interface_written = true; + } + Ok(()) + } + + /// Write a captured packet with metadata as an Enhanced Packet Block. + pub fn write(&mut self, cap: &CapturedPacket) -> Result<()> { + self.ensure_interface()?; + let epb = EnhancedPacketBlock { + interface_id: cap.metadata.interface_id.unwrap_or(0), + timestamp: cap.metadata.timestamp, + original_len: cap.metadata.orig_len, + data: Cow::Borrowed(cap.packet.as_bytes()), + options: vec![], + }; + self.inner + .write_pcapng_block(epb) + .map_err(|e| PacketError::Io(format!("PcapNG write error: {e}")))?; + Ok(()) + } + + /// Write a plain packet (timestamp=0, `orig_len=data` length). + pub fn write_packet(&mut self, pkt: &Packet) -> Result<()> { + self.ensure_interface()?; + let epb = EnhancedPacketBlock { + interface_id: 0, + timestamp: Duration::ZERO, + original_len: pkt.len() as u32, + data: Cow::Borrowed(pkt.as_bytes()), + options: vec![], + }; + self.inner + .write_pcapng_block(epb) + .map_err(|e| PacketError::Io(format!("PcapNG write error: {e}")))?; + Ok(()) + } +} + #[cfg(test)] mod tests { use super::*; - use crate::pcap::reader::PcapIterator; + use crate::pcap::reader::{PcapIterator, PcapNgIterator}; use std::io::Cursor; use std::time::Duration; @@ -139,17 +276,16 @@ mod tests { metadata: PcapMetadata { timestamp: Duration::from_secs(42), orig_len: eth.len() as u32, + ..Default::default() }, }; - // Write to in-memory buffer via PcapStreamWriter let mut buf = Vec::new(); { let mut writer = PcapStreamWriter::from_writer(Cursor::new(&mut buf)).unwrap(); writer.write(&cap).unwrap(); } - // Read back let iter = PcapIterator::from_reader(Cursor::new(buf)).unwrap(); let packets: Vec<_> = iter.collect::, _>>().unwrap(); assert_eq!(packets.len(), 1); @@ -167,6 +303,7 @@ mod tests { metadata: PcapMetadata { timestamp: Duration::from_secs(i), orig_len: eth.len() as u32, + ..Default::default() }, }) .collect(); @@ -204,4 +341,79 @@ mod tests { assert_eq!(packets[0].metadata.timestamp, Duration::ZERO); assert_eq!(packets[0].metadata.orig_len, eth.len() as u32); } + + #[test] + fn test_pcapng_writer_roundtrip() { + let eth = sample_ethernet_packet(); + let cap = CapturedPacket { + packet: Packet::from_bytes(bytes::Bytes::copy_from_slice(ð)), + metadata: PcapMetadata { + timestamp: Duration::from_secs(100), + orig_len: eth.len() as u32, + interface_id: Some(0), + comment: None, + }, + }; + + let mut buf = Vec::new(); + { + let mut writer = PcapNgStreamWriter::from_writer(Cursor::new(&mut buf)).unwrap(); + writer.write(&cap).unwrap(); + } + + let iter = PcapNgIterator::from_reader(Cursor::new(buf)).unwrap(); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert_eq!(packets.len(), 1); + assert_eq!(packets[0].metadata.timestamp, Duration::from_secs(100)); + assert_eq!(packets[0].packet.as_bytes(), eth.as_slice()); + assert_eq!(packets[0].metadata.interface_id, Some(0)); + } + + #[test] + fn test_pcapng_writer_multiple_packets() { + let eth = sample_ethernet_packet(); + + let caps: Vec = (0..3) + .map(|i| CapturedPacket { + packet: Packet::from_bytes(bytes::Bytes::copy_from_slice(ð)), + metadata: PcapMetadata { + timestamp: Duration::from_secs(i * 10), + orig_len: eth.len() as u32, + ..Default::default() + }, + }) + .collect(); + + let mut buf = Vec::new(); + { + let mut writer = PcapNgStreamWriter::from_writer(Cursor::new(&mut buf)).unwrap(); + for cap in &caps { + writer.write(cap).unwrap(); + } + } + + let iter = PcapNgIterator::from_reader(Cursor::new(buf)).unwrap(); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert_eq!(packets.len(), 3); + assert_eq!(packets[0].metadata.timestamp, Duration::from_secs(0)); + assert_eq!(packets[1].metadata.timestamp, Duration::from_secs(10)); + assert_eq!(packets[2].metadata.timestamp, Duration::from_secs(20)); + } + + #[test] + fn test_pcapng_write_packet_convenience() { + let eth = sample_ethernet_packet(); + let pkt = Packet::from_bytes(bytes::Bytes::copy_from_slice(ð)); + + let mut buf = Vec::new(); + { + let mut writer = PcapNgStreamWriter::from_writer(Cursor::new(&mut buf)).unwrap(); + writer.write_packet(&pkt).unwrap(); + } + + let iter = PcapNgIterator::from_reader(Cursor::new(buf)).unwrap(); + let packets: Vec<_> = iter.collect::, _>>().unwrap(); + assert_eq!(packets.len(), 1); + assert_eq!(packets[0].metadata.timestamp, Duration::ZERO); + } } diff --git a/python/stackforge/__init__.py b/python/stackforge/__init__.py index b49b4be..b31dacb 100644 --- a/python/stackforge/__init__.py +++ b/python/stackforge/__init__.py @@ -41,6 +41,7 @@ extract_flows_from_packets, rdpcap, wrpcap, + wrpcapng, ) from .custom import ( @@ -89,6 +90,7 @@ # PCAP I/O "rdpcap", "wrpcap", + "wrpcapng", "PcapPacket", "PcapReader", # Flow extraction diff --git a/src/lib.rs b/src/lib.rs index 41a87bb..74aab43 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3849,6 +3849,12 @@ impl PyPcapPacket { self.inner.metadata.orig_len } + /// Get the PcapNG interface ID (None for classic PCAP). + #[getter] + fn interface_id(&self) -> Option { + self.inner.metadata.interface_id + } + fn show(&self) -> String { show_packet(&self.inner.packet) } @@ -3888,14 +3894,14 @@ impl PyPcapPacket { /// ... print(pkt.summary()) #[pyclass(name = "PcapReader")] pub struct PyPcapReader { - inner: Option>>, + inner: Option>>, } #[pymethods] impl PyPcapReader { #[new] fn new(filename: &str) -> PyResult { - let iter = stackforge_core::PcapIterator::open(filename) + let iter = stackforge_core::CaptureIterator::open(filename) .map_err(|e| pyo3::exceptions::PyIOError::new_err(format!("{e}")))?; Ok(Self { inner: Some(iter) }) } @@ -3932,7 +3938,7 @@ impl PyPcapReader { #[pyfunction] #[pyo3(signature = (filename, count=0))] fn rdpcap(filename: &str, count: usize) -> PyResult> { - let iter = stackforge_core::PcapIterator::open(filename) + let iter = stackforge_core::CaptureIterator::open(filename) .map_err(|e| pyo3::exceptions::PyIOError::new_err(format!("{e}")))?; let results: Vec = if count > 0 { @@ -3990,7 +3996,58 @@ fn wrpcap(filename: &str, packets: Vec>) -> PyResult<()> } } - stackforge_core::wrpcap(filename, &captured) + // Auto-detect format from extension + if filename.ends_with(".pcapng") { + stackforge_core::wrpcapng(filename, &captured) + .map_err(|e| pyo3::exceptions::PyIOError::new_err(format!("{e}"))) + } else { + stackforge_core::wrpcap(filename, &captured) + .map_err(|e| pyo3::exceptions::PyIOError::new_err(format!("{e}"))) + } +} + +/// Write packets to a PcapNG file. +/// +/// Args: +/// filename: Path to the output PcapNG file +/// packets: List of Packet, PcapPacket, or LayerStack objects +/// +/// Example: +/// >>> wrpcapng("output.pcapng", packets) +#[pyfunction] +fn wrpcapng(filename: &str, packets: Vec>) -> PyResult<()> { + let mut captured: Vec = Vec::with_capacity(packets.len()); + + for pkt_any in &packets { + if let Ok(pcap_pkt) = pkt_any.extract::() { + captured.push(pcap_pkt.inner); + } else if let Ok(pkt) = pkt_any.extract::>() { + let len = pkt.inner.len() as u32; + captured.push(stackforge_core::CapturedPacket { + packet: pkt.inner.clone(), + metadata: stackforge_core::PcapMetadata { + orig_len: len, + ..Default::default() + }, + }); + } else if let Ok(stack) = pkt_any.extract::() { + let built = stack.inner.build_packet(); + let len = built.len() as u32; + captured.push(stackforge_core::CapturedPacket { + packet: built, + metadata: stackforge_core::PcapMetadata { + orig_len: len, + ..Default::default() + }, + }); + } else { + return Err(pyo3::exceptions::PyTypeError::new_err( + "packets must be Packet, PcapPacket, or LayerStack objects", + )); + } + } + + stackforge_core::wrpcapng(filename, &captured) .map_err(|e| pyo3::exceptions::PyIOError::new_err(format!("{e}"))) } @@ -4017,7 +4074,10 @@ impl PyFlowConfig { max_ooo_fragments=100, track_max_packet_len=false, track_max_flow_len=false, + memory_budget=None, + spill_dir=None, ))] + #[allow(clippy::too_many_arguments)] fn new( tcp_established_timeout: f64, tcp_half_open_timeout: f64, @@ -4027,6 +4087,8 @@ impl PyFlowConfig { max_ooo_fragments: usize, track_max_packet_len: bool, track_max_flow_len: bool, + memory_budget: Option, + spill_dir: Option, ) -> Self { Self { inner: stackforge_core::FlowConfig { @@ -4040,6 +4102,8 @@ impl PyFlowConfig { max_ooo_fragments, track_max_packet_len, track_max_flow_len, + memory_budget, + spill_dir: spill_dir.map(std::path::PathBuf::from), ..stackforge_core::FlowConfig::default() }, } @@ -4047,10 +4111,11 @@ impl PyFlowConfig { fn __repr__(&self) -> String { format!( - "FlowConfig(tcp_established_timeout={}, udp_timeout={}, max_reassembly_buffer={})", + "FlowConfig(tcp_established_timeout={}, udp_timeout={}, max_reassembly_buffer={}, memory_budget={:?})", self.inner.tcp_established_timeout.as_secs(), self.inner.udp_timeout.as_secs(), self.inner.max_reassembly_buffer, + self.inner.memory_budget, ) } } @@ -4275,11 +4340,11 @@ impl PyConversation { ) -> Option> { match &self.inner.protocol_state { stackforge_core::ProtocolState::Tcp(tcp) => { - let data = tcp.reassembler_fwd.reassembled_data(); + let data = tcp.reassembler_fwd.read_reassembled().ok()?; if data.is_empty() { None } else { - Some(pyo3::types::PyBytes::new(py, data)) + Some(pyo3::types::PyBytes::new(py, &data)) } }, _ => None, @@ -4294,11 +4359,11 @@ impl PyConversation { ) -> Option> { match &self.inner.protocol_state { stackforge_core::ProtocolState::Tcp(tcp) => { - let data = tcp.reassembler_rev.reassembled_data(); + let data = tcp.reassembler_rev.read_reassembled().ok()?; if data.is_empty() { None } else { - Some(pyo3::types::PyBytes::new(py, data)) + Some(pyo3::types::PyBytes::new(py, &data)) } }, _ => None, @@ -4329,8 +4394,16 @@ impl PyConversation { match &self.inner.protocol_state { stackforge_core::ProtocolState::Tcp(tcp) => { s.push_str(&format!(" TCP State: {}\n", tcp.conn_state)); - let fwd_len = tcp.reassembler_fwd.reassembled_data().len(); - let rev_len = tcp.reassembler_rev.reassembled_data().len(); + let fwd_len = tcp + .reassembler_fwd + .read_reassembled() + .map(|d| d.len()) + .unwrap_or(0); + let rev_len = tcp + .reassembler_rev + .read_reassembled() + .map(|d| d.len()) + .unwrap_or(0); if fwd_len > 0 || rev_len > 0 { s.push_str(&format!( " Reassembled: fwd={fwd_len} bytes, rev={rev_len} bytes\n" @@ -4380,13 +4453,21 @@ impl PyConversation { #[pyfunction] #[pyo3(signature = (pcap_path, config=None))] fn extract_flows(pcap_path: &str, config: Option) -> PyResult> { - let packets = stackforge_core::rdpcap(pcap_path) - .map_err(|e| pyo3::exceptions::PyIOError::new_err(format!("{e}")))?; - let flow_config = config.map(|c| c.inner).unwrap_or_default(); - let conversations = stackforge_core::extract_flows_with_config(&packets, flow_config) - .map_err(|e| pyo3::exceptions::PyRuntimeError::new_err(format!("{e}")))?; + // Use streaming extraction — never loads all packets into memory at once + let conversations = + stackforge_core::extract_flows_from_file(pcap_path, flow_config).map_err(|e| { + let msg = format!("{e}"); + if msg.contains("I/O error") + || msg.contains("failed to open") + || msg.contains("failed to create") + { + pyo3::exceptions::PyOSError::new_err(msg) + } else { + pyo3::exceptions::PyRuntimeError::new_err(msg) + } + })?; Ok(conversations .into_iter() @@ -4416,6 +4497,7 @@ fn extract_flows_from_packets( metadata: stackforge_core::PcapMetadata { timestamp: std::time::Duration::from_secs(i as u64), orig_len: pkt.inner.len() as u32, + ..Default::default() }, }) .collect(); @@ -4475,6 +4557,7 @@ fn stackforge(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_function(wrap_pyfunction!(rdpcap, m)?)?; m.add_function(wrap_pyfunction!(wrpcap, m)?)?; + m.add_function(wrap_pyfunction!(wrpcapng, m)?)?; // Flow extraction m.add_class::()?; diff --git a/tests/python/test_pcapng.py b/tests/python/test_pcapng.py new file mode 100644 index 0000000..c56cf4c --- /dev/null +++ b/tests/python/test_pcapng.py @@ -0,0 +1,201 @@ +"""Tests for PcapNG read/write support and auto-detection.""" + +import os +import tempfile + +from stackforge import ( + IP, + TCP, + UDP, + Ether, + PcapPacket, + PcapReader, + extract_flows, + rdpcap, + wrpcap, + wrpcapng, +) + + +def build_eth_ip_tcp(src_ip="10.0.0.1", dst_ip="10.0.0.2", sport=12345, dport=80): + """Build a simple Ethernet/IP/TCP packet.""" + stack = Ether() / IP(src=src_ip, dst=dst_ip) / TCP(sport=sport, dport=dport) + pkt = stack.build() + pkt.parse() + return pkt + + +def build_eth_ip_udp(src_ip="10.0.0.1", dst_ip="10.0.0.2", sport=5000, dport=53): + """Build a simple Ethernet/IP/UDP packet.""" + stack = Ether() / IP(src=src_ip, dst=dst_ip) / UDP(sport=sport, dport=dport) + pkt = stack.build() + pkt.parse() + return pkt + + +class TestPcapNgWrite: + """Test PcapNG file writing.""" + + def test_wrpcapng_single_packet(self): + """Write a single packet to PcapNG and read back.""" + pkt = build_eth_ip_tcp() + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, [pkt]) + packets = rdpcap(path) + assert len(packets) == 1 + assert packets[0].packet.src == pkt.src + assert packets[0].packet.dst == pkt.dst + finally: + os.unlink(path) + + def test_wrpcapng_multiple_packets(self): + """Write multiple packets to PcapNG and read back.""" + pkts = [build_eth_ip_tcp(sport=1000 + i, dport=80) for i in range(5)] + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, pkts) + packets = rdpcap(path) + assert len(packets) == 5 + finally: + os.unlink(path) + + def test_wrpcapng_empty(self): + """Write empty packet list to PcapNG.""" + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, []) + packets = rdpcap(path) + assert len(packets) == 0 + finally: + os.unlink(path) + + def test_wrpcap_auto_detect_pcapng_extension(self): + """wrpcap should auto-detect .pcapng extension and write PcapNG format.""" + pkt = build_eth_ip_tcp() + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcap(path, [pkt]) + packets = rdpcap(path) + assert len(packets) == 1 + finally: + os.unlink(path) + + +class TestPcapNgRead: + """Test PcapNG file reading with auto-detection.""" + + def test_rdpcap_reads_pcapng(self): + """rdpcap should auto-detect PcapNG format.""" + pkt = build_eth_ip_tcp() + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, [pkt]) + packets = rdpcap(path) + assert len(packets) == 1 + finally: + os.unlink(path) + + def test_rdpcap_still_reads_pcap(self): + """rdpcap should still handle classic PCAP files.""" + pkt = build_eth_ip_tcp() + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + try: + wrpcap(path, [pkt]) + packets = rdpcap(path) + assert len(packets) == 1 + finally: + os.unlink(path) + + def test_pcap_reader_pcapng(self): + """PcapReader should work with PcapNG files.""" + pkts = [build_eth_ip_tcp(sport=2000 + i) for i in range(3)] + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, pkts) + reader = PcapReader(path) + count = 0 + for pcap_pkt in reader: + assert isinstance(pcap_pkt, PcapPacket) + count += 1 + assert count == 3 + finally: + os.unlink(path) + + +class TestPcapNgRoundtrip: + """Test round-trip: write PcapNG -> read -> verify content.""" + + def test_roundtrip_preserves_data(self): + """Written and read-back packets should have matching raw bytes.""" + pkt = build_eth_ip_tcp() + raw_bytes = pkt.bytes() + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, [pkt]) + packets = rdpcap(path) + assert packets[0].packet.bytes() == raw_bytes + finally: + os.unlink(path) + + def test_roundtrip_mixed_protocols(self): + """PcapNG should handle mixed TCP and UDP packets.""" + pkts = [ + build_eth_ip_tcp(sport=1234), + build_eth_ip_udp(sport=5678), + build_eth_ip_tcp(sport=9012), + ] + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, pkts) + read_pkts = rdpcap(path) + assert len(read_pkts) == 3 + finally: + os.unlink(path) + + def test_pcapng_to_pcap_conversion(self): + """Write PcapNG, read, write as PCAP, read again.""" + pkt = build_eth_ip_tcp() + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + ng_path = f.name + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + pcap_path = f.name + try: + wrpcapng(ng_path, [pkt]) + packets = rdpcap(ng_path) + wrpcap(pcap_path, [p.packet for p in packets]) + packets2 = rdpcap(pcap_path) + assert len(packets2) == 1 + assert packets2[0].packet.bytes() == packets[0].packet.bytes() + finally: + os.unlink(ng_path) + os.unlink(pcap_path) + + +class TestExtractFlowsPcapNg: + """Test flow extraction from PcapNG files.""" + + def test_extract_flows_from_pcapng(self): + """extract_flows should work with PcapNG files.""" + pkts = [ + build_eth_ip_tcp(sport=1111, dport=80), + build_eth_ip_tcp(sport=1111, dport=80), + build_eth_ip_tcp(sport=2222, dport=443), + ] + with tempfile.NamedTemporaryFile(suffix=".pcapng", delete=False) as f: + path = f.name + try: + wrpcapng(path, pkts) + flows = extract_flows(path) + assert len(flows) >= 1 + finally: + os.unlink(path) diff --git a/tests/python/test_streaming_flows.py b/tests/python/test_streaming_flows.py new file mode 100644 index 0000000..5d8f34f --- /dev/null +++ b/tests/python/test_streaming_flows.py @@ -0,0 +1,136 @@ +"""Tests for streaming flow extraction with memory budget.""" + +import os +import tempfile + +from stackforge import ( + IP, + TCP, + UDP, + Ether, + FlowConfig, + extract_flows, + wrpcap, +) + + +def build_tcp_pkt(src_ip="10.0.0.1", dst_ip="10.0.0.2", sport=12345, dport=80): + """Build a simple TCP packet.""" + stack = Ether() / IP(src=src_ip, dst=dst_ip) / TCP(sport=sport, dport=dport) + pkt = stack.build() + pkt.parse() + return pkt + + +def build_udp_pkt(src_ip="10.0.0.1", dst_ip="10.0.0.2", sport=5000, dport=53): + """Build a simple UDP packet.""" + stack = Ether() / IP(src=src_ip, dst=dst_ip) / UDP(sport=sport, dport=dport) + pkt = stack.build() + pkt.parse() + return pkt + + +class TestStreamingFlowExtraction: + """Test that extract_flows works in streaming mode (from file).""" + + def test_basic_streaming(self): + """extract_flows should stream packets from file, not load all at once.""" + pkts = [build_tcp_pkt(sport=1000 + i) for i in range(10)] + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + try: + wrpcap(path, pkts) + flows = extract_flows(path) + assert len(flows) >= 1 + total_pkts = sum(f.total_packets for f in flows) + assert total_pkts == 10 + finally: + os.unlink(path) + + def test_streaming_multiple_flows(self): + """Streaming should correctly separate different flows.""" + pkts = [ + build_tcp_pkt(sport=1111, dport=80), + build_tcp_pkt(sport=1111, dport=80), + build_tcp_pkt(sport=2222, dport=443), + build_tcp_pkt(sport=2222, dport=443), + build_udp_pkt(sport=3333, dport=53), + ] + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + try: + wrpcap(path, pkts) + flows = extract_flows(path) + assert len(flows) == 3 + finally: + os.unlink(path) + + +class TestMemoryBudgetFlowConfig: + """Test FlowConfig with memory budget parameters.""" + + def test_flowconfig_with_memory_budget(self): + """FlowConfig should accept memory_budget parameter.""" + config = FlowConfig(memory_budget=50 * 1024 * 1024) # 50MB + assert config is not None + + def test_flowconfig_with_spill_dir(self): + """FlowConfig should accept spill_dir parameter.""" + with tempfile.TemporaryDirectory() as tmpdir: + config = FlowConfig(memory_budget=1024, spill_dir=tmpdir) + assert config is not None + + def test_extract_flows_with_budget(self): + """extract_flows should work with a memory budget.""" + pkts = [build_tcp_pkt(sport=1000 + i) for i in range(5)] + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + try: + wrpcap(path, pkts) + config = FlowConfig(memory_budget=10 * 1024 * 1024) # 10MB + flows = extract_flows(path, config=config) + assert len(flows) >= 1 + finally: + os.unlink(path) + + def test_extract_flows_small_budget(self): + """extract_flows should still work with a very small memory budget.""" + pkts = [build_tcp_pkt(sport=1000 + i) for i in range(20)] + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + try: + wrpcap(path, pkts) + config = FlowConfig(memory_budget=1024) # Very small: 1KB + flows = extract_flows(path, config=config) + assert len(flows) >= 1 + total_pkts = sum(f.total_packets for f in flows) + assert total_pkts == 20 + finally: + os.unlink(path) + + def test_extract_flows_no_budget(self): + """extract_flows without budget should work (unlimited memory).""" + pkts = [build_tcp_pkt(sport=1000 + i) for i in range(5)] + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + try: + wrpcap(path, pkts) + config = FlowConfig() # No budget + flows = extract_flows(path, config=config) + assert len(flows) >= 1 + finally: + os.unlink(path) + + def test_budget_with_spill_dir(self): + """Memory budget with custom spill directory should work.""" + pkts = [build_tcp_pkt(sport=1000 + i) for i in range(10)] + with tempfile.NamedTemporaryFile(suffix=".pcap", delete=False) as f: + path = f.name + with tempfile.TemporaryDirectory() as spill_dir: + try: + wrpcap(path, pkts) + config = FlowConfig(memory_budget=512, spill_dir=spill_dir) + flows = extract_flows(path, config=config) + assert len(flows) >= 1 + finally: + os.unlink(path)