diff --git a/Cargo.lock b/Cargo.lock index 1dd8b22..f2506ca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1030,7 +1030,7 @@ dependencies = [ [[package]] name = "stackforge" -version = "0.5.0" +version = "0.6.0" dependencies = [ "pyo3", "stackforge-automata", @@ -1039,7 +1039,7 @@ dependencies = [ [[package]] name = "stackforge-automata" -version = "0.5.0" +version = "0.6.0" dependencies = [ "bytes", "stackforge-core", @@ -1047,7 +1047,7 @@ dependencies = [ [[package]] name = "stackforge-core" -version = "0.5.0" +version = "0.6.0" dependencies = [ "aes", "aes-gcm", diff --git a/Cargo.toml b/Cargo.toml index d5ba2a2..e576aa0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ shared-version = true tag-name = "v{{version}}" [workspace.package] -version = "0.5.0" +version = "0.6.0" edition = "2024" license = "GPL-3.0-only" authors = ["Stackforge Contributors"] diff --git a/README.md b/README.md index 8275991..3df6337 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,8 @@ - **Scapy-style API** — Stack layers with `Ether() / IP() / TCP()`, set fields with keyword arguments - **High Performance** — Core logic in Rust, zero-copy parsing, copy-on-write mutation -- **Broad Protocol Support** — Ethernet, ARP, IPv4/IPv6, TCP, UDP, ICMP, ICMPv6, DNS, HTTP/1.x, HTTP/2, QUIC, L2TP, 802.11 (Wi-Fi), 802.15.4 (Zigbee), and custom protocols -- **Stateful Flow Extraction** — Extract bidirectional conversations from PCAP files with TCP state tracking, stream reassembly, and UDP timeout handling +- **Broad Protocol Support** — Ethernet, ARP, IPv4/IPv6, TCP, UDP, ICMP/ICMPv6 (with echo correlation), DNS, HTTP/1.x, HTTP/2, QUIC, L2TP, 802.11 (Wi-Fi), 802.15.4 (Zigbee), and custom protocols +- **Stateful Flow Extraction** — Extract bidirectional conversations from PCAP files with TCP state tracking, stream reassembly, UDP timeout handling, and optional max packet/flow length tracking - **PCAP I/O** — Read and write pcap files with `rdpcap()` / `wrpcap()` - **Python Bindings** — Seamless integration via PyO3/maturin - **Custom Protocols** — Define runtime protocols with `CustomLayer` and typed fields @@ -303,6 +303,45 @@ config = FlowConfig( conversations = extract_flows("capture.pcap", config=config) ``` +Optional: Track maximum packet sizes during flow extraction: + +```python +config = FlowConfig( + track_max_packet_len=True, # Track max per-direction (forward_max_packet_len, reverse_max_packet_len) + track_max_flow_len=True, # Track overall max (max_flow_len) +) +conversations = extract_flows("capture.pcap", config=config) + +for conv in conversations: + print(f"Max fwd packet: {conv.forward_max_packet_len} bytes") + print(f"Max rev packet: {conv.reverse_max_packet_len} bytes") + print(f"Max overall: {conv.max_flow_len} bytes") +``` + +Disabled by default (zero overhead). Enable only when needed for flow analysis. + +#### ICMP and ICMPv6 Flow Tracking + +Automatically correlate ICMP echo request/reply pairs and track other ICMP message types: + +```python +conversations = extract_flows("capture.pcap") + +for conv in conversations: + if conv.protocol == "ICMP" or conv.protocol == "ICMPv6": + print(f"ICMP Echo: {conv.src_addr} <-> {conv.dst_addr}") + print(f" Type: {conv.icmp_type}, Code: {conv.icmp_code}") + print(f" Identifier: {conv.icmp_identifier}") + print(f" Requests: {conv.icmp_request_count}, Replies: {conv.icmp_reply_count}") + print(f" Last seq: {conv.icmp_last_seq}") +``` + +Features: +- Echo request/reply pairs correlated via identifier (symmetric src/dst ports) +- Non-echo message types tracked via (type, code) substitution +- Properties: `icmp_type`, `icmp_code`, `icmp_identifier`, `icmp_request_count`, `icmp_reply_count`, `icmp_last_seq` +- Returns `None` for non-ICMP flows + ## Rust Crate The core library is available as a standalone Rust crate: diff --git a/crates/stackforge-core/src/flow/config.rs b/crates/stackforge-core/src/flow/config.rs index b6034a0..1255d99 100644 --- a/crates/stackforge-core/src/flow/config.rs +++ b/crates/stackforge-core/src/flow/config.rs @@ -20,6 +20,10 @@ pub struct FlowConfig { pub max_ooo_fragments: usize, /// Interval between idle conversation eviction sweeps (default: 30s). pub eviction_interval: Duration, + /// Track maximum packet length per direction (default: false). + pub track_max_packet_len: bool, + /// Track maximum flow length per direction (default: false). + pub track_max_flow_len: bool, } impl Default for FlowConfig { @@ -32,6 +36,8 @@ impl Default for FlowConfig { max_reassembly_buffer: 16 * 1024 * 1024, // 16 MB max_ooo_fragments: 100, eviction_interval: Duration::from_secs(30), + track_max_packet_len: false, + track_max_flow_len: false, } } } @@ -50,5 +56,7 @@ mod tests { assert_eq!(config.max_reassembly_buffer, 16 * 1024 * 1024); assert_eq!(config.max_ooo_fragments, 100); assert_eq!(config.eviction_interval, Duration::from_secs(30)); + assert!(!config.track_max_packet_len); + assert!(!config.track_max_flow_len); } } diff --git a/crates/stackforge-core/src/flow/mod.rs b/crates/stackforge-core/src/flow/mod.rs index 9892dcc..0690ba3 100644 --- a/crates/stackforge-core/src/flow/mod.rs +++ b/crates/stackforge-core/src/flow/mod.rs @@ -121,7 +121,7 @@ pub fn extract_zwave_flows( state }); - conv.record_packet(direction, byte_count, timestamp, index); + conv.record_packet(direction, byte_count, timestamp, index, false, false); // Track ACK vs command frames if let ProtocolState::ZWave(ref mut zw) = conv.protocol_state diff --git a/crates/stackforge-core/src/flow/state.rs b/crates/stackforge-core/src/flow/state.rs index de8dffb..b3c37e6 100644 --- a/crates/stackforge-core/src/flow/state.rs +++ b/crates/stackforge-core/src/flow/state.rs @@ -48,6 +48,8 @@ pub struct DirectionStats { pub first_seen: Duration, /// Timestamp of the most recent packet in this direction. pub last_seen: Duration, + /// Maximum packet length in this direction (if tracking enabled). + pub max_packet_len: Option, } impl DirectionStats { @@ -58,14 +60,18 @@ impl DirectionStats { bytes: 0, first_seen: timestamp, last_seen: timestamp, + max_packet_len: None, } } /// Record a new packet in this direction. - pub fn record_packet(&mut self, byte_count: u64, timestamp: Duration) { + pub fn record_packet(&mut self, byte_count: u64, timestamp: Duration, track_max_len: bool) { self.packets += 1; self.bytes += byte_count; self.last_seen = timestamp; + if track_max_len { + self.max_packet_len = Some(self.max_packet_len.unwrap_or(0).max(byte_count)); + } } } @@ -120,6 +126,8 @@ pub struct ConversationState { pub packet_indices: Vec, /// Protocol-specific state. pub protocol_state: ProtocolState, + /// Maximum packet length across both directions (if tracking enabled). + pub max_flow_len: Option, } impl ConversationState { @@ -143,6 +151,7 @@ impl ConversationState { reverse: DirectionStats::new(timestamp), packet_indices: Vec::new(), protocol_state, + max_flow_len: None, } } @@ -179,6 +188,7 @@ impl ConversationState { command_count: 0, ack_count: 0, }), + max_flow_len: None, } } @@ -207,13 +217,26 @@ impl ConversationState { byte_count: u64, timestamp: Duration, packet_index: usize, + track_max_packet_len: bool, + track_max_flow_len: bool, ) { self.last_seen = timestamp; self.packet_indices.push(packet_index); match direction { - FlowDirection::Forward => self.forward.record_packet(byte_count, timestamp), - FlowDirection::Reverse => self.reverse.record_packet(byte_count, timestamp), + FlowDirection::Forward => { + self.forward + .record_packet(byte_count, timestamp, track_max_packet_len); + }, + FlowDirection::Reverse => { + self.reverse + .record_packet(byte_count, timestamp, track_max_packet_len); + }, + } + + // Update max flow length if tracking is enabled + if track_max_flow_len { + self.max_flow_len = Some(self.max_flow_len.unwrap_or(0).max(byte_count)); } } diff --git a/crates/stackforge-core/src/flow/table.rs b/crates/stackforge-core/src/flow/table.rs index 8d595dc..9b76f4f 100644 --- a/crates/stackforge-core/src/flow/table.rs +++ b/crates/stackforge-core/src/flow/table.rs @@ -72,7 +72,14 @@ impl ConversationTable { let conv = entry.value_mut(); // Record packet stats - conv.record_packet(direction, byte_count, timestamp, packet_index); + conv.record_packet( + direction, + byte_count, + timestamp, + packet_index, + self.config.track_max_packet_len, + self.config.track_max_flow_len, + ); // Process protocol-specific state let buf = packet.as_bytes(); diff --git a/src/lib.rs b/src/lib.rs index 0f6d12f..41a87bb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3861,6 +3861,10 @@ impl PyPcapPacket { hexdump_bytes(self.inner.packet.as_bytes()) } + fn bytes<'py>(&self, py: Python<'py>) -> Bound<'py, PyBytes> { + PyBytes::new(py, self.inner.packet.as_bytes()) + } + fn __repr__(&self) -> String { format!( "", @@ -4011,6 +4015,8 @@ impl PyFlowConfig { udp_timeout=120.0, max_reassembly_buffer=16777216, max_ooo_fragments=100, + track_max_packet_len=false, + track_max_flow_len=false, ))] fn new( tcp_established_timeout: f64, @@ -4019,6 +4025,8 @@ impl PyFlowConfig { udp_timeout: f64, max_reassembly_buffer: usize, max_ooo_fragments: usize, + track_max_packet_len: bool, + track_max_flow_len: bool, ) -> Self { Self { inner: stackforge_core::FlowConfig { @@ -4030,6 +4038,8 @@ impl PyFlowConfig { udp_timeout: std::time::Duration::from_secs_f64(udp_timeout), max_reassembly_buffer, max_ooo_fragments, + track_max_packet_len, + track_max_flow_len, ..stackforge_core::FlowConfig::default() }, } @@ -4137,6 +4147,24 @@ impl PyConversation { self.inner.total_bytes() } + /// Maximum packet length in forward direction. + #[getter] + fn forward_max_packet_len(&self) -> Option { + self.inner.forward.max_packet_len + } + + /// Maximum packet length in reverse direction. + #[getter] + fn reverse_max_packet_len(&self) -> Option { + self.inner.reverse.max_packet_len + } + + /// Maximum packet length across both directions. + #[getter] + fn max_flow_len(&self) -> Option { + self.inner.max_flow_len + } + /// Indices of packets belonging to this conversation. #[getter] fn packet_indices(&self) -> Vec { diff --git a/tests/python/test_max_packet_flow_len.py b/tests/python/test_max_packet_flow_len.py new file mode 100644 index 0000000..2f0ad62 --- /dev/null +++ b/tests/python/test_max_packet_flow_len.py @@ -0,0 +1,252 @@ +import pytest +from stackforge import IP, TCP, UDP, Ether, FlowConfig, Packet, Raw, extract_flows_from_packets + + +class TestMaxPacketFlowLengthTracking: + """Test max packet and flow length tracking in flow extraction.""" + + def test_default_no_tracking(self): + """By default, no tracking is enabled.""" + packets = [] + for i in range(3): + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000 + i, dport=80) + / Raw(b"x" * (100 + i * 50)) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig() # defaults: no tracking + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + assert conv.forward_max_packet_len is None + assert conv.reverse_max_packet_len is None + assert conv.max_flow_len is None + + def test_track_max_packet_len_forward_only(self): + """Track max packet length in forward direction.""" + packets = [] + # Forward: 100, 150, 200 bytes + for i in range(3): + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000, dport=80) + / Raw(b"x" * (100 + i * 50)) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_packet_len=True) + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + # All packets are in forward direction + assert conv.forward_max_packet_len is not None + assert conv.forward_max_packet_len > 200 # 200 bytes + headers + assert conv.reverse_max_packet_len is None + assert conv.max_flow_len is None + + def test_track_max_packet_len_bidirectional(self): + """Track max packet length in both directions separately.""" + packets = [] + # Forward: 100 bytes + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000, dport=80) + / Raw(b"x" * 100) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + # Reverse: 500 bytes + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.2", dst="10.0.0.1") + / TCP(sport=80, dport=5000) + / Raw(b"y" * 500) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_packet_len=True) + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + assert conv.forward_max_packet_len is not None + assert conv.reverse_max_packet_len is not None + assert conv.reverse_max_packet_len > conv.forward_max_packet_len + assert conv.max_flow_len is None + + def test_track_max_flow_len(self): + """Track max flow length (largest packet overall).""" + packets = [] + # 100 bytes + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000, dport=80) + / Raw(b"x" * 100) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + # 500 bytes + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.2", dst="10.0.0.1") + / TCP(sport=80, dport=5000) + / Raw(b"y" * 500) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_flow_len=True) + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + assert conv.forward_max_packet_len is None + assert conv.reverse_max_packet_len is None + assert conv.max_flow_len is not None + # Should be max of all packets + assert conv.max_flow_len > 500 + + def test_track_both_max_packet_and_flow_len(self): + """Track both max packet length and max flow length.""" + packets = [] + sizes = [100, 200, 300, 400, 500] + for i, size in enumerate(sizes): + src = "10.0.0.1" if i % 2 == 0 else "10.0.0.2" + dst = "10.0.0.2" if i % 2 == 0 else "10.0.0.1" + sport = 5000 if i % 2 == 0 else 80 + dport = 80 if i % 2 == 0 else 5000 + pkt = Packet( + ( + Ether() + / IP(src=src, dst=dst) + / TCP(sport=sport, dport=dport) + / Raw(b"x" * size) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_packet_len=True, track_max_flow_len=True) + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + # Both should be tracked + assert conv.forward_max_packet_len is not None + assert conv.reverse_max_packet_len is not None + assert conv.max_flow_len is not None + + # max_flow_len should be >= both directional maxes + assert conv.max_flow_len >= conv.forward_max_packet_len + assert conv.max_flow_len >= conv.reverse_max_packet_len + + def test_max_packet_len_multiple_packets_same_direction(self): + """Test that max_packet_len tracks the largest packet in multiple packets.""" + packets = [] + sizes = [50, 150, 100, 200, 75] # Max is 200 + for size in sizes: + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000, dport=80) + / Raw(b"x" * size) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_packet_len=True) + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + # Largest packet is 200 bytes + headers + assert conv.forward_max_packet_len is not None + assert conv.forward_max_packet_len > 200 + + def test_different_protocols_different_flows(self): + """Test that TCP and UDP flows don't interfere with each other.""" + packets = [] + + # TCP flow + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000, dport=80) + / Raw(b"x" * 100) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + # UDP flow + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / UDP(sport=5000, dport=53) + / Raw(b"y" * 500) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_packet_len=True, track_max_flow_len=True) + flows = extract_flows_from_packets(packets, config) + + # Should have 2 flows + assert len(flows) == 2 + + tcp_flow = next(f for f in flows if f.protocol == "TCP") + udp_flow = next(f for f in flows if f.protocol == "UDP") + + # UDP should have larger packet + assert udp_flow.max_flow_len > tcp_flow.max_flow_len + + def test_can_disable_tracking(self): + """Test that setting flags to False disables tracking.""" + packets = [] + for i in range(3): + pkt = Packet( + ( + Ether() + / IP(src="10.0.0.1", dst="10.0.0.2") + / TCP(sport=5000 + i, dport=80) + / Raw(b"x" * (100 + i * 50)) + ).bytes() + ) + pkt.parse() + packets.append(pkt) + + config = FlowConfig(track_max_packet_len=False, track_max_flow_len=False) + flows = extract_flows_from_packets(packets, config) + conv = flows[0] + + assert conv.forward_max_packet_len is None + assert conv.reverse_max_packet_len is None + assert conv.max_flow_len is None + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])