diff --git a/Cargo.toml b/Cargo.toml index 2ddbdaa..126604e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -97,6 +97,7 @@ wasm = [ serde = [ "dep:serde", "ipnet/serde", + "serde/rc", ] native-tls = [ "oneio/native-tls", diff --git a/benches/internals.rs b/benches/internals.rs index bc14382..5866367 100644 --- a/benches/internals.rs +++ b/benches/internals.rs @@ -91,6 +91,19 @@ pub fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("updates into_route_iter", |b| { + b.iter(|| { + let mut reader = black_box(&updates[..]); + + BgpkitParser::from_reader(&mut reader) + .into_route_iter() + .take(RECORD_LIMIT) + .for_each(|x| { + black_box(x); + }); + }) + }); + c.bench_function("updates into_raw_record_iter", |b| { b.iter(|| { let mut reader = black_box(&updates[..]); @@ -143,6 +156,19 @@ pub fn criterion_benchmark(c: &mut Criterion) { }) }); + c.bench_function("rib into_route_iter", |b| { + b.iter(|| { + let mut reader = black_box(&rib_dump[..]); + + BgpkitParser::from_reader(&mut reader) + .into_route_iter() + .take(RECORD_LIMIT) + .for_each(|x| { + black_box(x); + }); + }) + }); + c.bench_function("rib into_raw_record_iter", |b| { b.iter(|| { let mut reader = black_box(&rib_dump[..]); diff --git a/examples/route_level_parsing.rs b/examples/route_level_parsing.rs new file mode 100644 index 0000000..9e7dba0 --- /dev/null +++ b/examples/route_level_parsing.rs @@ -0,0 +1,144 @@ +use bgpkit_parser::{BgpkitParser, Filterable}; +use std::time::Instant; + +/// This example demonstrates the lightweight route-level parser (`into_route_iter()`) +/// which provides significantly faster processing when you only need basic route +/// information (prefix, AS path, peer metadata) without full BGP attributes. +/// +/// Performance characteristics: +/// - Updates files: ~10-15% faster (fewer attributes to skip) +/// - RIB dump files: ~50-70% faster (many attributes per route) +/// +/// Use `into_route_iter()` when you need: +/// - prefix, AS path, peer IP/AS, timestamp +/// - Fast scanning/filtering of large datasets +/// - No need for communities, MED, next-hop, local-pref, etc. +/// +/// Use `into_elem_iter()` when you need: +/// - Full BGP attributes (communities, MED, next-hop, local-pref, etc.) +/// - Community-based filtering +fn main() { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); + + // Example 1: Download and parse an updates file using route iterator + log::info!("=== Example 1: Route-level parsing ==="); + + let url = "http://archive.routeviews.org/bgpdata/2021.10/UPDATES/updates.20211001.0000.bz2"; + + let start = Instant::now(); + let parser = BgpkitParser::new(url).unwrap(); + + let mut route_count = 0; + for route in parser.into_route_iter().take(1000) { + if route_count < 3 { + // Show first few routes + log::info!( + "Route {}: {} via AS{} (peer: {})", + route_count + 1, + route.prefix, + route.peer_asn, + route.peer_ip + ); + if let Some(ref path) = route.as_path { + log::info!(" AS Path: {}", path); + } + } + route_count += 1; + } + let route_time = start.elapsed(); + log::info!( + "Route-level parsing: {} routes in {:.3}s", + route_count, + route_time.as_secs_f64() + ); + + // Example 2: Compare with element-level parsing + log::info!("\n=== Example 2: Element-level parsing (full attributes) ==="); + + let start = Instant::now(); + let parser = BgpkitParser::new(url).unwrap(); + + let mut elem_count = 0; + for elem in parser.into_elem_iter().take(1000) { + if elem_count < 3 { + log::info!( + "Element {}: {} via AS{} (next-hop: {:?})", + elem_count + 1, + elem.prefix, + elem.peer_asn, + elem.next_hop + ); + if let Some(ref communities) = elem.communities { + log::info!(" Communities: {:?}", communities); + } + } + elem_count += 1; + } + let elem_time = start.elapsed(); + log::info!( + "Element-level parsing: {} elements in {:.3}s", + elem_count, + elem_time.as_secs_f64() + ); + + // Example 3: Filtering with route elements + log::info!("\n=== Example 3: Filtering route elements ==="); + + let parser = BgpkitParser::new(url).unwrap(); + // Filter for routes from peer AS49788 (seen in the output above) + let filter = bgpkit_parser::Filter::new("peer_asn", "49788").unwrap(); + + let mut filtered_count = 0; + for route in parser.into_route_iter().take(1000) { + if route.match_filter(&filter) { + filtered_count += 1; + if filtered_count <= 3 { + log::info!( + "Matched filter (peer_asn=49788): {} from AS{}", + route.prefix, + route.peer_asn + ); + } + } + } + log::info!( + "Total routes matching filter (first 1000): {}", + filtered_count + ); + + // Example 4: Demonstrate AS path filtering + log::info!("\n=== Example 4: AS path filtering ==="); + + let parser = BgpkitParser::new(url).unwrap(); + // Filter for routes with AS1299 somewhere in the path + let as_path_filter = bgpkit_parser::Filter::new("as_path", "1299").unwrap(); + + let mut as_path_matches = 0; + for route in parser.into_route_iter().take(1000) { + if route.match_filter(&as_path_filter) { + as_path_matches += 1; + if as_path_matches <= 3 { + log::info!( + "AS Path contains 1299: {} - path: {:?}", + route.prefix, + route.as_path.as_ref().map(|p| p.to_string()) + ); + } + } + } + log::info!( + "Total routes with AS1299 in path (first 1000): {}", + as_path_matches + ); + + log::info!("\n=== Summary ==="); + log::info!( + "Route-level: {:.3}s | Element-level: {:.3}s", + route_time.as_secs_f64(), + elem_time.as_secs_f64() + ); + log::info!(""); + log::info!("Performance gain is most significant for RIB dumps with many attributes."); + log::info!("For update files, the difference is smaller since there are fewer attributes."); + log::info!("Note: Community filters are NOT supported with route elements."); +} diff --git a/src/lib.rs b/src/lib.rs index 257b551..1bb5958 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -839,6 +839,7 @@ pub mod parser; pub mod wasm; pub use models::BgpElem; +pub use models::BgpRouteElem; pub use models::MrtRecord; #[cfg(feature = "parser")] pub use parser::*; diff --git a/src/models/bgp/elem.rs b/src/models/bgp/elem.rs index f301739..164e498 100644 --- a/src/models/bgp/elem.rs +++ b/src/models/bgp/elem.rs @@ -4,6 +4,7 @@ use std::cmp::Ordering; use std::fmt::{Display, Formatter}; use std::net::IpAddr; use std::str::FromStr; +use std::sync::Arc; // TODO(jmeggitt): BgpElem can be converted to an enum. Apply this change during performance PR. @@ -156,14 +157,49 @@ pub struct BgpElem { pub deprecated: Option>, } +/// Lightweight per-prefix route element. +/// +/// This struct is intended for fast scans that only need route identity, +/// peer metadata, timestamp, and AS path. Use [`BgpElem`] when you need the +/// full set of BGP attributes. Because route elements do not carry +/// communities, community filters do not match [`BgpRouteElem`] values. +#[derive(Debug, Clone, PartialEq)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct BgpRouteElem { + /// The timestamp of the item in floating-point format. + pub timestamp: f64, + /// The element type of an item. + #[cfg_attr(feature = "serde", serde(rename = "type"))] + pub elem_type: ElemType, + /// The IP address of the peer associated with the item. + pub peer_ip: IpAddr, + /// The peer ASN of the item. + pub peer_asn: Asn, + /// The network prefix of the item. + pub prefix: NetworkPrefix, + /// The optional path representation of the item. + /// + /// Route-level parsing shares the same AS path across all announced + /// prefixes from a single message. + pub as_path: Option>, +} + impl Eq for BgpElem {} +impl Eq for BgpRouteElem {} + impl PartialOrd for BgpElem { fn partial_cmp(&self, other: &Self) -> Option { Some(self.cmp(other)) } } +impl PartialOrd for BgpRouteElem { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + impl Ord for BgpElem { fn cmp(&self, other: &Self) -> Ordering { self.timestamp @@ -173,6 +209,15 @@ impl Ord for BgpElem { } } +impl Ord for BgpRouteElem { + fn cmp(&self, other: &Self) -> Ordering { + self.timestamp + .partial_cmp(&other.timestamp) + .unwrap() + .then_with(|| self.peer_ip.cmp(&other.peer_ip)) + } +} + impl Default for BgpElem { fn default() -> Self { BgpElem { @@ -279,6 +324,25 @@ impl Display for BgpElem { } } +impl Display for BgpRouteElem { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + let t = match self.elem_type { + ElemType::ANNOUNCE => "A", + ElemType::WITHDRAW => "W", + }; + write!( + f, + "{}|{}|{}|{}|{}|{}", + t, + &self.timestamp, + &self.peer_ip, + &self.peer_asn, + &self.prefix, + OptionToStr(&self.as_path), + ) + } +} + impl BgpElem { /// Returns true if the element is an announcement. /// @@ -424,6 +488,37 @@ mod tests { assert!(elem2 < elem3); } + #[test] + fn test_route_elem_sorting() { + let elem1 = BgpRouteElem { + timestamp: 1.1, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::from_str("192.168.1.1").unwrap(), + peer_asn: 0.into(), + prefix: NetworkPrefix::from_str("8.8.8.0/24").unwrap(), + as_path: None, + }; + let elem2 = BgpRouteElem { + timestamp: 1.2, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::from_str("192.168.1.1").unwrap(), + peer_asn: 0.into(), + prefix: NetworkPrefix::from_str("8.8.8.0/24").unwrap(), + as_path: None, + }; + let elem3 = BgpRouteElem { + timestamp: 1.2, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::from_str("192.168.1.2").unwrap(), + peer_asn: 0.into(), + prefix: NetworkPrefix::from_str("8.8.8.0/24").unwrap(), + as_path: None, + }; + + assert!(elem1 < elem2); + assert!(elem2 < elem3); + } + #[test] fn test_psv() { assert_eq!( @@ -437,6 +532,23 @@ mod tests { ); } + #[test] + fn test_route_elem_display() { + let elem = BgpRouteElem { + timestamp: 1.1, + elem_type: ElemType::ANNOUNCE, + peer_ip: IpAddr::from_str("192.168.1.1").unwrap(), + peer_asn: 64496.into(), + prefix: NetworkPrefix::from_str("8.8.8.0/24").unwrap(), + as_path: Some(Arc::new(AsPath::from_sequence([64496, 64497]))), + }; + + assert_eq!( + elem.to_string().as_str(), + "A|1.1|192.168.1.1|64496|8.8.8.0/24|64496 64497" + ); + } + #[test] fn test_option_to_str() { let asn_opt: Option = Some(12); diff --git a/src/parser/bgp/attributes/mod.rs b/src/parser/bgp/attributes/mod.rs index 9011489..8bf731a 100644 --- a/src/parser/bgp/attributes/mod.rs +++ b/src/parser/bgp/attributes/mod.rs @@ -22,7 +22,8 @@ use crate::models::*; use crate::error::{BgpValidationWarning, ParserError}; use crate::parser::bgp::attributes::attr_01_origin::{encode_origin, parse_origin}; -use crate::parser::bgp::attributes::attr_02_17_as_path::{encode_as_path, parse_as_path}; +use crate::parser::bgp::attributes::attr_02_17_as_path::encode_as_path; +pub(crate) use crate::parser::bgp::attributes::attr_02_17_as_path::parse_as_path; use crate::parser::bgp::attributes::attr_03_next_hop::{encode_next_hop, parse_next_hop}; use crate::parser::bgp::attributes::attr_04_med::{encode_med, parse_med}; use crate::parser::bgp::attributes::attr_05_local_pref::{encode_local_pref, parse_local_pref}; @@ -34,7 +35,8 @@ use crate::parser::bgp::attributes::attr_09_originator::{ encode_originator_id, parse_originator_id, }; use crate::parser::bgp::attributes::attr_10_13_cluster::{encode_clusters, parse_clusters}; -use crate::parser::bgp::attributes::attr_14_15_nlri::{encode_nlri, parse_nlri}; +use crate::parser::bgp::attributes::attr_14_15_nlri::encode_nlri; +pub(crate) use crate::parser::bgp::attributes::attr_14_15_nlri::parse_nlri; use crate::parser::bgp::attributes::attr_16_25_extended_communities::{ encode_extended_communities, encode_ipv6_extended_communities, parse_extended_community, parse_ipv6_extended_community, @@ -128,6 +130,123 @@ fn is_well_known_mandatory(attr_type: AttrType) -> bool { ) } +pub(crate) struct AttributeValidationState { + warnings: Vec, + attr_mask: [u64; 4], +} + +impl AttributeValidationState { + pub(crate) fn new() -> Self { + Self { + warnings: Vec::new(), + attr_mask: [0; 4], + } + } + + fn has_raw_attr(&self, attr: u8) -> bool { + (self.attr_mask[(attr / 64) as usize] & (1u64 << (attr % 64))) != 0 + } + + pub(crate) fn has_attr(&self, attr_type: AttrType) -> bool { + self.has_raw_attr(u8::from(attr_type)) + } + + fn set_attr(&mut self, attr: u8) { + self.attr_mask[(attr / 64) as usize] |= 1u64 << (attr % 64); + } + + pub(crate) fn observe_header( + &mut self, + raw_attr_type: u8, + attr_type: AttrType, + flags: AttrFlags, + length: usize, + ) -> bool { + if self.has_raw_attr(raw_attr_type) { + self.warnings + .push(BgpValidationWarning::DuplicateAttribute { attr_type }); + } + self.set_attr(raw_attr_type); + + validate_attribute_flags(attr_type, flags, &mut self.warnings); + validate_attribute_length(attr_type, length, &mut self.warnings); + + flags.contains(AttrFlags::PARTIAL) + } + + pub(crate) fn observe_parse_error( + &mut self, + attr_type: AttrType, + partial: bool, + error: &ParserError, + ) { + if partial { + self.warnings + .push(BgpValidationWarning::PartialAttributeError { + attr_type, + reason: error.to_string(), + }); + debug!("PARTIAL attribute error: {}", error); + } else if is_well_known_mandatory(attr_type) { + self.warnings + .push(BgpValidationWarning::MalformedAttributeList { + reason: format!( + "Well-known mandatory attribute {} parsing failed: {}", + u8::from(attr_type), + error + ), + }); + debug!( + "Well-known mandatory attribute parsing failed, treating as withdraw: {}", + error + ); + } else { + self.warnings + .push(BgpValidationWarning::OptionalAttributeError { + attr_type, + reason: error.to_string(), + }); + debug!("Optional attribute error, discarding: {}", error); + } + } + + pub(crate) fn check_mandatory_attributes( + &mut self, + is_announcement: bool, + has_standard_nlri: bool, + ) { + if !is_announcement { + return; + } + + if !self.has_attr(AttrType::ORIGIN) { + self.warnings + .push(BgpValidationWarning::MissingWellKnownAttribute { + attr_type: AttrType::ORIGIN, + }); + } + + if !self.has_attr(AttrType::AS_PATH) { + self.warnings + .push(BgpValidationWarning::MissingWellKnownAttribute { + attr_type: AttrType::AS_PATH, + }); + } + + let has_mp_reach = self.has_attr(AttrType::MP_REACHABLE_NLRI); + if (has_standard_nlri || !has_mp_reach) && !self.has_attr(AttrType::NEXT_HOP) { + self.warnings + .push(BgpValidationWarning::MissingWellKnownAttribute { + attr_type: AttrType::NEXT_HOP, + }); + } + } + + pub(crate) fn finish(self) -> (Vec, [u64; 4]) { + (self.warnings, self.attr_mask) + } +} + /// Validate attribute length constraints fn validate_attribute_length( attr_type: AttrType, @@ -184,16 +303,7 @@ pub fn parse_attributes( // (flag + type + length). Cap at 256 to avoid over-allocation for corrupted data. let estimated_attrs = (data.remaining() / 3).min(256); let mut attributes: Vec = Vec::with_capacity(estimated_attrs.max(8)); - let mut validation_warnings: Vec = Vec::new(); - - // Bitmask for seen attributes (256 bits total, using 4x u64). Fits in registers/cache easily. - let mut seen_attributes: [u64; 4] = [0; 4]; - let has_attr = |seen: &[u64; 4], attr: u8| -> bool { - (seen[(attr / 64) as usize] & (1u64 << (attr % 64))) != 0 - }; - let set_attr = |seen: &mut [u64; 4], attr: u8| { - seen[(attr / 64) as usize] |= 1u64 << (attr % 64); - }; + let mut validation = AttributeValidationState::new(); while data.remaining() >= 3 { // each attribute is at least 3 bytes: flag(1) + type(1) + length(1) @@ -207,21 +317,6 @@ pub fn parse_attributes( true => data.read_u16()? as usize, }; - let mut partial = false; - if flag.contains(AttrFlags::PARTIAL) { - /* - https://datatracker.ietf.org/doc/html/rfc4271#section-4.3 - - > The third high-order bit (bit 2) of the Attribute Flags octet - > is the Partial bit. It defines whether the information - > contained in the optional transitive attribute is partial (if - > set to 1) or complete (if set to 0). For well-known attributes - > and for optional non-transitive attributes, the Partial bit - > MUST be set to 0. - */ - partial = true; - } - debug!( "reading attribute: type -- {:?}, length -- {}", &attr_type, attr_length @@ -229,18 +324,7 @@ pub fn parse_attributes( let parsed_attr_type = AttrType::from(attr_type); - // RFC 7606: Check for duplicate attributes - if has_attr(&seen_attributes, attr_type) { - validation_warnings.push(BgpValidationWarning::DuplicateAttribute { - attr_type: parsed_attr_type, - }); - // Continue processing - don't skip duplicate for now - } - set_attr(&mut seen_attributes, attr_type); - - // Validate attribute flags and length - validate_attribute_flags(parsed_attr_type, flag, &mut validation_warnings); - validate_attribute_length(parsed_attr_type, attr_length, &mut validation_warnings); + let partial = validation.observe_header(attr_type, parsed_attr_type, flag, attr_length); let attr_type = match parsed_attr_type { attr_type @ AttrType::Unknown(unknown_type) => { @@ -349,46 +433,17 @@ pub fn parse_attributes( attributes.push(Attribute { value, flag }); } Err(e) => { - // RFC 7606 error handling - if partial { - // Partial attribute with errors - log warning but continue - validation_warnings.push(BgpValidationWarning::PartialAttributeError { - attr_type, - reason: e.to_string(), - }); - debug!("PARTIAL attribute error: {}", e); - } else if is_well_known_mandatory(attr_type) { - // For well-known mandatory attributes, use "treat-as-withdraw" approach - // Don't break parsing, but log warning - validation_warnings.push(BgpValidationWarning::MalformedAttributeList { - reason: format!( - "Well-known mandatory attribute {} parsing failed: {}", - u8::from(attr_type), - e - ), - }); - debug!( - "Well-known mandatory attribute parsing failed, treating as withdraw: {}", - e - ); - } else { - // For optional attributes, use "attribute discard" approach - validation_warnings.push(BgpValidationWarning::OptionalAttributeError { - attr_type, - reason: e.to_string(), - }); - debug!("Optional attribute error, discarding: {}", e); - } - // Continue parsing in all cases - never break the session + validation.observe_parse_error(attr_type, partial, &e); continue; } }; } + let (validation_warnings, attr_mask) = validation.finish(); Ok(Attributes { inner: attributes, validation_warnings, - attr_mask: seen_attributes, + attr_mask, }) } diff --git a/src/parser/bgp/messages.rs b/src/parser/bgp/messages.rs index b51be01..47c06c5 100644 --- a/src/parser/bgp/messages.rs +++ b/src/parser/bgp/messages.rs @@ -29,6 +29,18 @@ struct RawBgpOpenHeader { const _: () = assert!(size_of::() == 10); +pub(crate) fn read_and_validate_bgp_marker(data: &mut Bytes) -> Result<(), ParserError> { + data.has_n_remaining(16)?; + + let mut marker = [0u8; 16]; + data.copy_to_slice(&mut marker); + if marker != [0xFF; 16] { + warn!("BGP message marker is not all 0xFF bytes (invalid per RFC 4271)"); + } + + Ok(()) +} + /// BGP message /// /// Format: @@ -54,15 +66,7 @@ pub fn parse_bgp_message( ) -> Result { let total_size = data.len(); data.has_n_remaining(19)?; - - // Read and validate BGP marker (RFC 4271: 16 bytes of 0xFF) - let mut marker = [0u8; 16]; - data.copy_to_slice(&mut marker); - if marker != [0xFF; 16] { - // Log warning for invalid marker but continue processing - // Some implementations may use non-standard markers in MRT dumps - warn!("BGP message marker is not all 0xFF bytes (invalid per RFC 4271)"); - } + read_and_validate_bgp_marker(data)?; /* This 2-octet unsigned integer indicates the total length of the diff --git a/src/parser/filter.rs b/src/parser/filter.rs index c0df1b9..bc1f025 100644 --- a/src/parser/filter.rs +++ b/src/parser/filter.rs @@ -108,8 +108,9 @@ the filter type string. For multiple prefixes, use `"prefixes_super"`, `"prefixe ### Note -Currently, only [BgpElem] implements the filtering capability. Support for [MrtRecord] will come in -later releases. +[BgpElem] and [BgpRouteElem] implement the filtering capability. Route-level filtering only has +access to route identity, peer metadata, timestamp, and AS path, so `community` filters do not match +[BgpRouteElem] values. Support for [MrtRecord] will come in later releases. */ use crate::models::*; @@ -522,7 +523,27 @@ impl Filter { pub trait Filterable { fn match_filter(&self, filter: &Filter) -> bool; - fn match_filters(&self, filters: &[Filter]) -> bool; + fn match_filters(&self, filters: &[Filter]) -> bool { + filters.iter().all(|f| self.match_filter(f)) + } +} + +trait RouteFilterView { + fn timestamp(&self) -> f64; + fn elem_type(&self) -> ElemType; + fn peer_ip(&self) -> IpAddr; + fn peer_asn(&self) -> Asn; + fn prefix(&self) -> &NetworkPrefix; + fn as_path(&self) -> Option<&AsPath>; + fn matches_origin_asn(&self, asn: Asn) -> bool; + + fn matches_community(&self, _regex: &ComparableRegex) -> bool { + false + } + + fn supports_community_filter(&self) -> bool { + false + } } const fn same_family(prefix_1: &IpNet, prefix_2: &IpNet) -> bool { @@ -577,62 +598,126 @@ fn prefix_match(match_prefix: &IpNet, input_prefix: &IpNet, t: &PrefixMatchType) } } +fn match_route_view_filter(view: &T, filter: &Filter) -> bool { + match filter { + Filter::Negated(inner) + if matches!(inner.as_ref(), Filter::Community(_)) + && !view.supports_community_filter() => + { + false + } + Filter::Negated(inner) => !match_route_view_filter(view, inner), + Filter::OriginAsn(v) => view.matches_origin_asn((*v).into()), + Filter::OriginAsns(v) => v.iter().any(|asn| view.matches_origin_asn((*asn).into())), + Filter::Prefix(v, t) => prefix_match(v, &view.prefix().prefix, t), + Filter::Prefixes(v, t) => v + .iter() + .any(|prefix| prefix_match(prefix, &view.prefix().prefix, t)), + Filter::PeerIp(v) => view.peer_ip() == *v, + Filter::PeerIps(v) => v.contains(&view.peer_ip()), + Filter::PeerAsn(v) => view.peer_asn().eq(v), + Filter::PeerAsns(v) => v.iter().any(|asn| view.peer_asn().eq(asn)), + Filter::Type(v) => view.elem_type().eq(v), + Filter::TsStart(v) => view.timestamp() >= *v, + Filter::TsEnd(v) => view.timestamp() <= *v, + Filter::AsPath(v) => view + .as_path() + .map(|path| v.is_match(path.to_string().as_str())) + .unwrap_or(false), + Filter::Community(r) => view.supports_community_filter() && view.matches_community(r), + Filter::IpVersion(version) => match version { + IpVersion::Ipv4 => view.prefix().prefix.addr().is_ipv4(), + IpVersion::Ipv6 => view.prefix().prefix.addr().is_ipv6(), + }, + } +} + +impl RouteFilterView for BgpElem { + fn timestamp(&self) -> f64 { + self.timestamp + } + + fn elem_type(&self) -> ElemType { + self.elem_type + } + + fn peer_ip(&self) -> IpAddr { + self.peer_ip + } + + fn peer_asn(&self) -> Asn { + self.peer_asn + } + + fn prefix(&self) -> &NetworkPrefix { + &self.prefix + } + + fn as_path(&self) -> Option<&AsPath> { + self.as_path.as_ref() + } + + fn matches_origin_asn(&self, asn: Asn) -> bool { + self.origin_asns + .as_ref() + .map(|origins| origins.contains(&asn)) + .unwrap_or(false) + } + + fn matches_community(&self, regex: &ComparableRegex) -> bool { + self.communities + .as_ref() + .map(|communities| communities.iter().any(|c| regex.is_match(c.to_string()))) + .unwrap_or(false) + } + + fn supports_community_filter(&self) -> bool { + true + } +} + +impl RouteFilterView for BgpRouteElem { + fn timestamp(&self) -> f64 { + self.timestamp + } + + fn elem_type(&self) -> ElemType { + self.elem_type + } + + fn peer_ip(&self) -> IpAddr { + self.peer_ip + } + + fn peer_asn(&self) -> Asn { + self.peer_asn + } + + fn prefix(&self) -> &NetworkPrefix { + &self.prefix + } + + fn as_path(&self) -> Option<&AsPath> { + self.as_path.as_deref() + } + + fn matches_origin_asn(&self, asn: Asn) -> bool { + self.as_path + .as_ref() + .map(|path| path.iter_origins().any(|origin| origin == asn)) + .unwrap_or(false) + } +} + impl Filterable for BgpElem { fn match_filter(&self, filter: &Filter) -> bool { - match filter { - Filter::Negated(inner) => !self.match_filter(inner), - Filter::OriginAsn(v) => { - let asn: Asn = (*v).into(); - if let Some(origins) = &self.origin_asns { - origins.contains(&asn) - } else { - false - } - } - Filter::OriginAsns(v) => { - if let Some(origins) = &self.origin_asns { - v.iter().any(|asn| { - let asn_obj: Asn = (*asn).into(); - origins.contains(&asn_obj) - }) - } else { - false - } - } - Filter::Prefix(v, t) => prefix_match(v, &self.prefix.prefix, t), - Filter::Prefixes(v, t) => v - .iter() - .any(|prefix| prefix_match(prefix, &self.prefix.prefix, t)), - Filter::PeerIp(v) => self.peer_ip == *v, - Filter::PeerIps(v) => v.contains(&self.peer_ip), - Filter::PeerAsn(v) => self.peer_asn.eq(v), - Filter::PeerAsns(v) => v.iter().any(|asn| self.peer_asn.eq(asn)), - Filter::Type(v) => self.elem_type.eq(v), - Filter::TsStart(v) => self.timestamp >= *v, - Filter::TsEnd(v) => self.timestamp <= *v, - Filter::AsPath(v) => { - if let Some(path) = &self.as_path { - v.is_match(path.to_string().as_str()) - } else { - false - } - } - Filter::Community(r) => { - if let Some(communities) = &self.communities { - communities.iter().any(|c| r.is_match(c.to_string())) - } else { - false - } - } - Filter::IpVersion(version) => match version { - IpVersion::Ipv4 => self.prefix.prefix.addr().is_ipv4(), - IpVersion::Ipv6 => self.prefix.prefix.addr().is_ipv6(), - }, - } + match_route_view_filter(self, filter) } +} - fn match_filters(&self, filters: &[Filter]) -> bool { - filters.iter().all(|f| self.match_filter(f)) +impl Filterable for BgpRouteElem { + fn match_filter(&self, filter: &Filter) -> bool { + match_route_view_filter(self, filter) } } @@ -642,6 +727,58 @@ mod tests { use crate::BgpkitParser; use anyhow::Result; use std::str::FromStr; + use std::sync::Arc; + + fn filter_test_elem() -> BgpElem { + BgpElem { + timestamp: 1637437798_f64, + peer_ip: IpAddr::from_str("192.168.1.1").unwrap(), + peer_asn: Asn::new_32bit(12345), + peer_bgp_id: None, + prefix: NetworkPrefix::new(IpNet::from_str("192.168.1.0/24").unwrap(), None), + next_hop: None, + as_path: Some(AsPath::from_sequence(vec![174, 1916, 52888])), + origin_asns: Some(vec![Asn::new_32bit(52888)]), + origin: None, + local_pref: None, + med: None, + communities: Some(vec![MetaCommunity::Large(LargeCommunity::new( + 12345, + [678910, 111213], + ))]), + atomic: false, + aggr_asn: None, + aggr_ip: None, + only_to_customer: None, + unknown: None, + elem_type: ElemType::ANNOUNCE, + deprecated: None, + } + } + + fn route_projection(elem: &BgpElem) -> BgpRouteElem { + BgpRouteElem { + timestamp: elem.timestamp, + elem_type: elem.elem_type, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn, + prefix: elem.prefix, + as_path: elem.as_path.clone().map(Arc::new), + } + } + + #[test] + fn test_route_community_filters_fail_closed() { + let elem = filter_test_elem(); + let route = route_projection(&elem); + let community = Filter::new("community", r"12345:.*").unwrap(); + let negated_community = Filter::new("community", r"!12345:.*").unwrap(); + + assert!(elem.match_filter(&community)); + assert!(!elem.match_filter(&negated_community)); + assert!(!route.match_filter(&community)); + assert!(!route.match_filter(&negated_community)); + } #[test] fn test_filters_on_mrt_file() { @@ -986,34 +1123,11 @@ mod tests { #[test] fn test_filterable_match_filter() { - let elem = BgpElem { - timestamp: 1637437798_f64, - peer_ip: IpAddr::from_str("192.168.1.1").unwrap(), - peer_asn: Asn::new_32bit(12345), - peer_bgp_id: None, - prefix: NetworkPrefix::new(IpNet::from_str("192.168.1.0/24").unwrap(), None), - next_hop: None, - as_path: Some(AsPath::from_sequence(vec![174, 1916, 52888])), - origin_asns: Some(vec![Asn::new_16bit(12345)]), - origin: None, - local_pref: None, - med: None, - communities: Some(vec![MetaCommunity::Large(LargeCommunity::new( - 12345, - [678910, 111213], - ))]), - atomic: false, - aggr_asn: None, - aggr_ip: None, - only_to_customer: None, - unknown: None, - elem_type: ElemType::ANNOUNCE, - deprecated: None, - }; + let elem = filter_test_elem(); let mut filters = vec![]; - let filter = Filter::new("origin_asn", "12345").unwrap(); + let filter = Filter::new("origin_asn", "52888").unwrap(); filters.push(filter.clone()); assert!(elem.match_filter(&filter)); @@ -1062,6 +1176,155 @@ mod tests { assert!(elem.match_filters(&filters)); } + #[test] + fn test_route_filterable_matches_elem_for_route_level_filters() { + let elem = filter_test_elem(); + let route = route_projection(&elem); + + let cases = vec![ + ( + "origin_asn matches", + Filter::new("origin_asn", "52888").unwrap(), + true, + ), + ( + "origin_asn misses", + Filter::new("origin_asn", "64496").unwrap(), + false, + ), + ( + "origin_asns matches", + Filter::new("origin_asns", "64496,52888").unwrap(), + true, + ), + ( + "origin_asns misses", + Filter::new("origin_asns", "64496,64497").unwrap(), + false, + ), + ( + "prefix exact matches", + Filter::new("prefix", "192.168.1.0/24").unwrap(), + true, + ), + ( + "prefix exact misses", + Filter::new("prefix", "192.168.2.0/24").unwrap(), + false, + ), + ( + "prefix_super matches", + Filter::new("prefix_super", "192.168.1.128/25").unwrap(), + true, + ), + ( + "prefix_sub matches", + Filter::new("prefix_sub", "192.168.0.0/23").unwrap(), + true, + ), + ( + "prefix_super_sub matches", + Filter::new("prefix_super_sub", "192.168.1.128/25").unwrap(), + true, + ), + ( + "prefixes matches", + Filter::new("prefixes", "10.0.0.0/8,192.168.1.0/24").unwrap(), + true, + ), + ( + "peer_ip matches", + Filter::new("peer_ip", "192.168.1.1").unwrap(), + true, + ), + ( + "peer_ips matches", + Filter::new("peer_ips", "192.168.1.2,192.168.1.1").unwrap(), + true, + ), + ( + "peer_asn matches", + Filter::new("peer_asn", "12345").unwrap(), + true, + ), + ( + "peer_asns matches", + Filter::new("peer_asns", "12346,12345").unwrap(), + true, + ), + ("type matches", Filter::new("type", "a").unwrap(), true), + ("type misses", Filter::new("type", "w").unwrap(), false), + ( + "ts_start matches", + Filter::new("ts_start", "1637437797").unwrap(), + true, + ), + ( + "ts_end matches", + Filter::new("ts_end", "1637437799").unwrap(), + true, + ), + ( + "as_path matches", + Filter::new("as_path", r"174 1916 52888$").unwrap(), + true, + ), + ( + "as_path misses", + Filter::new("as_path", r"64496$").unwrap(), + false, + ), + ( + "ip_version matches", + Filter::new("ip_version", "4").unwrap(), + true, + ), + ( + "ip_version misses", + Filter::new("ip_version", "6").unwrap(), + false, + ), + ( + "negated origin_asn matches", + Filter::new("origin_asn", "!64496").unwrap(), + true, + ), + ( + "negated origin_asn misses", + Filter::new("origin_asn", "!52888").unwrap(), + false, + ), + ]; + + for (name, filter, expected) in cases { + assert_eq!(elem.match_filter(&filter), expected, "{name} BgpElem"); + assert_eq!(route.match_filter(&filter), expected, "{name} BgpRouteElem"); + assert_eq!( + elem.match_filter(&filter), + route.match_filter(&filter), + "{name} parity" + ); + } + + let filters = vec![ + Filter::new("origin_asn", "52888").unwrap(), + Filter::new("peer_asn", "!64496").unwrap(), + Filter::new("prefix_super", "192.168.1.128/25").unwrap(), + ]; + assert!(elem.match_filters(&filters)); + assert_eq!(elem.match_filters(&filters), route.match_filters(&filters)); + } + + #[test] + fn test_route_filterable_does_not_match_community_filters() { + let elem = filter_test_elem(); + let route = route_projection(&elem); + let filter = Filter::new("community", r"12345:678910:111213$").unwrap(); + + assert!(elem.match_filter(&filter)); + assert!(!route.match_filter(&filter)); + } + #[test] fn test_negated_filters() { let elem = BgpElem { diff --git a/src/parser/iters/mod.rs b/src/parser/iters/mod.rs index bf94cf0..c8897f7 100644 --- a/src/parser/iters/mod.rs +++ b/src/parser/iters/mod.rs @@ -13,12 +13,14 @@ Rust's iterator syntax. pub mod default; pub mod fallible; mod raw; +mod route; mod update; // Re-export all iterator types for convenience pub use default::{ElemIterator, RecordIterator}; pub use fallible::{FallibleElemIterator, FallibleRecordIterator}; pub use raw::RawRecordIterator; +pub use route::{FallibleRouteIterator, RouteIterator}; pub use update::{ Bgp4MpUpdate, FallibleUpdateIterator, MrtUpdate, TableDumpV2Entry, UpdateIterator, }; @@ -29,6 +31,23 @@ use crate::parser::BgpkitParser; use crate::Elementor; use crate::RawMrtRecord; use std::io::Read; +use std::path::Path; + +pub(crate) fn write_mrt_core_dump(enabled: bool, bytes: Option>) { + write_mrt_core_dump_to_path(enabled, bytes, "mrt_core_dump"); +} + +pub(crate) fn write_mrt_core_dump_to_path>( + enabled: bool, + bytes: Option>, + path: P, +) { + if enabled { + if let Some(bytes) = bytes { + std::fs::write(path, bytes).expect("Unable to write to mrt_core_dump"); + } + } +} /// Use [ElemIterator] as the default iterator to return [BgpElem]s instead of [MrtRecord]s. impl IntoIterator for BgpkitParser { @@ -96,6 +115,17 @@ impl BgpkitParser { UpdateIterator::new(self) } + /// Creates an iterator over lightweight route elements from MRT data. + /// + /// This iterator yields [`BgpRouteElem`](crate::models::BgpRouteElem) + /// values and only parses route identity, peer metadata, timestamp, and + /// AS path. Use [`into_elem_iter`](Self::into_elem_iter) when you need + /// the full [`BgpElem`] attribute set. Filters that only depend on route + /// fields are supported; `community` filters do not match route elements. + pub fn into_route_iter(self) -> RouteIterator { + RouteIterator::new(self) + } + /// Creates a fallible iterator over MRT records that returns parsing errors. /// /// # Example @@ -172,6 +202,11 @@ impl BgpkitParser { FallibleUpdateIterator::new(self) } + /// Creates a fallible iterator over lightweight route elements. + pub fn into_fallible_route_iter(self) -> FallibleRouteIterator { + FallibleRouteIterator::new(self) + } + /// Creates an Elementor pre-initialized with PeerIndexTable and an iterator over raw records. /// /// This is useful for parallel processing where the Elementor needs to be shared across threads. diff --git a/src/parser/iters/route.rs b/src/parser/iters/route.rs new file mode 100644 index 0000000..9a1e1ad --- /dev/null +++ b/src/parser/iters/route.rs @@ -0,0 +1,1880 @@ +use crate::error::{ParserError, ParserErrorWithBytes}; +use crate::models::*; +use crate::parser::bgp::attributes::{parse_as_path, parse_nlri, AttributeValidationState}; +use crate::parser::bgp::messages::read_and_validate_bgp_marker; +use crate::parser::iters::write_mrt_core_dump; +use crate::parser::mrt::messages::bgp4mp::bgp4mp_message_payload_len; +use crate::parser::mrt::messages::table_dump_v2::rib_entry_min_len; +use crate::parser::{chunk_mrt_record, parse_nlri_list, BgpkitParser, Filterable, ReadUtils}; +use bytes::{Buf, Bytes}; +use ipnet::IpNet; +use log::{error, warn}; +use std::io::Read; +use std::net::IpAddr; +use std::sync::Arc; + +#[derive(Default)] +struct RouteAttributes { + as_path: Option>, + announced: Vec, + withdrawn: Vec, +} + +struct RouteAttributeContext<'a> { + afi: Option, + safi: Option, + prefixes: Option<&'a [NetworkPrefix]>, + is_announcement: Option, + has_standard_nlri: bool, +} + +fn merge_as_path(as_path: Option, as4_path: Option) -> Option> { + let path = match (as_path, as4_path) { + (None, None) => None, + (Some(path), None) | (None, Some(path)) => Some(path), + (Some(path), Some(as4_path)) => Some(AsPath::merge_aspath_as4path(&path, &as4_path)), + }; + path.map(Arc::new) +} + +fn parse_route_attributes( + mut data: Bytes, + asn_len: &AsnLength, + add_path: bool, + ctx: RouteAttributeContext<'_>, +) -> Result { + let mut validation = AttributeValidationState::new(); + let mut as_path = None; + let mut as4_path = None; + let mut announced = Vec::new(); + let mut withdrawn = Vec::new(); + + while data.remaining() >= 3 { + let flags = AttrFlags::from_bits_retain(data.read_u8()?); + let raw_attr_type = data.read_u8()?; + let attr_length = if flags.contains(AttrFlags::EXTENDED) { + data.read_u16()? as usize + } else { + data.read_u8()? as usize + }; + let attr_type = AttrType::from(raw_attr_type); + let partial = validation.observe_header(raw_attr_type, attr_type, flags, attr_length); + + if data.remaining() < attr_length { + warn!( + "{:?} attribute encodes a length ({}) that is longer than the remaining attribute data ({}). Skipping remaining attribute data for BGP message", + attr_type, + attr_length, + data.remaining() + ); + break; + } + + let attr_data = data.split_to(attr_length); + let result = match attr_type { + AttrType::AS_PATH => parse_as_path(attr_data, asn_len).map(|path| { + as_path = Some(path); + }), + AttrType::AS4_PATH => parse_as_path(attr_data, &AsnLength::Bits32).map(|path| { + as4_path = Some(path); + }), + AttrType::MP_REACHABLE_NLRI => parse_nlri( + attr_data, + &ctx.afi, + &ctx.safi, + &ctx.prefixes, + true, + add_path, + ) + .map(|attr| { + if let AttributeValue::MpReachNlri(nlri) = attr { + announced = nlri.prefixes; + } + }), + AttrType::MP_UNREACHABLE_NLRI => parse_nlri( + attr_data, + &ctx.afi, + &ctx.safi, + &ctx.prefixes, + false, + add_path, + ) + .map(|attr| { + if let AttributeValue::MpUnreachNlri(nlri) = attr { + withdrawn = nlri.prefixes; + } + }), + _ => Ok(()), + }; + + if let Err(err) = result { + validation.observe_parse_error(attr_type, partial, &err); + } + } + + let is_announcement = ctx + .is_announcement + .unwrap_or(ctx.has_standard_nlri || validation.has_attr(AttrType::MP_REACHABLE_NLRI)); + validation.check_mandatory_attributes(is_announcement, ctx.has_standard_nlri); + let _warnings = validation.finish(); + Ok(RouteAttributes { + as_path: merge_as_path(as_path, as4_path), + announced, + withdrawn, + }) +} + +fn record_timestamp(common_header: &CommonHeader) -> f64 { + match common_header.microsecond_timestamp { + Some(microseconds) => common_header.timestamp as f64 + microseconds as f64 / 1_000_000.0, + None => common_header.timestamp as f64, + } +} + +struct RouteUpdateIter { + timestamp: f64, + peer_ip: IpAddr, + peer_asn: Asn, + as_path: Option>, + announced: + std::iter::Chain, std::vec::IntoIter>, + withdrawn: + std::iter::Chain, std::vec::IntoIter>, + in_withdrawn_phase: bool, +} + +impl RouteUpdateIter { + fn next_route(&mut self) -> Option { + if !self.in_withdrawn_phase { + if let Some(prefix) = self.announced.next() { + return Some(BgpRouteElem { + timestamp: self.timestamp, + elem_type: ElemType::ANNOUNCE, + peer_ip: self.peer_ip, + peer_asn: self.peer_asn, + prefix, + as_path: self.as_path.clone(), + }); + } + self.in_withdrawn_phase = true; + } + + self.withdrawn.next().map(|prefix| BgpRouteElem { + timestamp: self.timestamp, + elem_type: ElemType::WITHDRAW, + peer_ip: self.peer_ip, + peer_asn: self.peer_asn, + prefix, + as_path: None, + }) + } +} + +#[derive(Clone, Default)] +struct RoutePeerTable { + peers: Arc<[Peer]>, +} + +impl RoutePeerTable { + fn get_peer_by_id(&self, peer_index: u16) -> Option { + self.peers.get(peer_index as usize).copied() + } +} + +fn parse_route_peer_table(mut data: Bytes) -> Result { + let _collector_bgp_id = data.read_u32()?; + let view_name_length = data.read_u16()? as usize; + data.has_n_remaining(view_name_length)?; + data.advance(view_name_length); + + let peer_count = data.read_u16()? as usize; + let mut peers = Vec::with_capacity(peer_count); + for _ in 0..peer_count { + let peer_type = PeerType::from_bits_retain(data.read_u8()?); + let afi = if peer_type.contains(PeerType::ADDRESS_FAMILY_IPV6) { + Afi::Ipv6 + } else { + Afi::Ipv4 + }; + let asn_len = if peer_type.contains(PeerType::AS_SIZE_32BIT) { + AsnLength::Bits32 + } else { + AsnLength::Bits16 + }; + + let peer_bgp_id = data.read_ipv4_address()?; + let peer_ip = data.read_address(&afi)?; + let peer_asn = data.read_asn(asn_len)?; + peers.push(Peer { + peer_type, + peer_bgp_id, + peer_ip, + peer_asn, + }); + } + + Ok(RoutePeerTable { + peers: Arc::from(peers), + }) +} + +#[derive(Default)] +enum RouteRecordIter { + #[default] + Empty, + One(Option), + Update(RouteUpdateIter), + RibAfi(RouteRibAfiIter), +} + +impl RouteRecordIter { + fn next_route(&mut self) -> Result, ParserError> { + match self { + RouteRecordIter::Empty => Ok(None), + RouteRecordIter::One(route) => Ok(route.take()), + RouteRecordIter::Update(iter) => Ok(iter.next_route()), + RouteRecordIter::RibAfi(iter) => iter.next_route(), + } + } +} + +struct RouteRibAfiIter { + data: Bytes, + peer_table: RoutePeerTable, + afi: Afi, + safi: Safi, + is_add_path: bool, + prefix: NetworkPrefix, + remaining_entries: u16, +} + +impl RouteRibAfiIter { + fn next_route(&mut self) -> Result, ParserError> { + while self.remaining_entries > 0 { + if self.data.remaining() < rib_entry_min_len(self.is_add_path) { + warn!("early break due to truncated msg while parsing RIB AFI entries"); + self.remaining_entries = 0; + return Ok(None); + } + + self.remaining_entries -= 1; + let peer_index = self.data.read_u16()?; + let originated_time = self.data.read_u32()? as f64; + let _path_id = if self.is_add_path { + Some(self.data.read_u32()?) + } else { + None + }; + let attribute_length = self.data.read_u16()? as usize; + if self.data.remaining() < attribute_length { + warn!( + "early break due to truncated attribute payload while parsing RIB AFI entries: expected {} bytes, have {} bytes available", + attribute_length, + self.data.remaining() + ); + self.remaining_entries = 0; + return Ok(None); + } + + let prefixes = [self.prefix]; + let attrs = parse_route_attributes( + self.data.split_to(attribute_length), + &AsnLength::Bits32, + self.is_add_path, + RouteAttributeContext { + afi: Some(self.afi), + safi: Some(self.safi), + prefixes: Some(&prefixes), + is_announcement: Some(true), + has_standard_nlri: self.afi == Afi::Ipv4, + }, + )?; + let Some(peer) = self.peer_table.get_peer_by_id(peer_index) else { + error!("peer ID {} not found in peer_index table", peer_index); + continue; + }; + + return Ok(Some(BgpRouteElem { + timestamp: originated_time, + elem_type: ElemType::ANNOUNCE, + peer_ip: peer.peer_ip, + peer_asn: peer.peer_asn, + prefix: self.prefix, + as_path: attrs.as_path, + })); + } + + Ok(None) + } +} + +fn parse_bgp_update_routes( + mut input: Bytes, + add_path: bool, + asn_len: &AsnLength, + timestamp: f64, + peer_ip: IpAddr, + peer_asn: Asn, +) -> Result { + let withdrawn_len = input.read_u16()? as usize; + input.has_n_remaining(withdrawn_len)?; + let withdrawn_prefixes = parse_nlri_list(input.split_to(withdrawn_len), add_path, &Afi::Ipv4)?; + + let attribute_length = input.read_u16()? as usize; + input.has_n_remaining(attribute_length)?; + let attribute_bytes = input.split_to(attribute_length); + let announced_prefixes = parse_nlri_list(input, add_path, &Afi::Ipv4)?; + let attributes = parse_route_attributes( + attribute_bytes, + asn_len, + add_path, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: None, + has_standard_nlri: !announced_prefixes.is_empty(), + }, + )?; + + Ok(RouteUpdateIter { + timestamp, + peer_ip, + peer_asn, + as_path: attributes.as_path, + announced: announced_prefixes.into_iter().chain(attributes.announced), + withdrawn: withdrawn_prefixes.into_iter().chain(attributes.withdrawn), + in_withdrawn_phase: false, + }) +} + +fn parse_bgp_message_routes( + mut data: Bytes, + add_path: bool, + asn_len: &AsnLength, + timestamp: f64, + peer_ip: IpAddr, + peer_asn: Asn, +) -> Result { + let total_size = data.len(); + data.has_n_remaining(19)?; + read_and_validate_bgp_marker(&mut data)?; + let length = data.read_u16()?; + if !(19..=65_535).contains(&length) { + return Err(ParserError::ParseError(format!( + "invalid BGP message length {length}" + ))); + } + + let bgp_msg_length = if length as usize > total_size { + total_size - 19 + } else { + length as usize - 19 + }; + let msg_type = BgpMessageType::try_from(data.read_u8()?) + .map_err(|_| ParserError::ParseError("Unknown BGP Message Type".to_string()))?; + + if matches!(msg_type, BgpMessageType::OPEN | BgpMessageType::KEEPALIVE) && length > 4096 { + return Err(ParserError::ParseError(format!( + "BGP {msg_type:?} message length {length} exceeds maximum allowed 4096 bytes (RFC 8654)" + ))); + } + + if data.remaining() != bgp_msg_length { + warn!( + "BGP message length {} does not match the actual length {} (parsing BGP message)", + bgp_msg_length, + data.remaining() + ); + } + data.has_n_remaining(bgp_msg_length)?; + let msg_data = data.split_to(bgp_msg_length); + + match msg_type { + BgpMessageType::UPDATE => Ok(RouteRecordIter::Update(parse_bgp_update_routes( + msg_data, add_path, asn_len, timestamp, peer_ip, peer_asn, + )?)), + BgpMessageType::OPEN | BgpMessageType::NOTIFICATION | BgpMessageType::KEEPALIVE => { + Ok(RouteRecordIter::Empty) + } + } +} + +fn bgp4mp_asn_len_and_add_path(msg_type: Bgp4MpType) -> Option<(AsnLength, bool)> { + match msg_type { + Bgp4MpType::Message | Bgp4MpType::MessageLocal => Some((AsnLength::Bits16, false)), + Bgp4MpType::MessageAs4 | Bgp4MpType::MessageAs4Local => Some((AsnLength::Bits32, false)), + Bgp4MpType::MessageAddpath | Bgp4MpType::MessageLocalAddpath => { + Some((AsnLength::Bits16, true)) + } + Bgp4MpType::MessageAs4Addpath | Bgp4MpType::MessageLocalAs4Addpath => { + Some((AsnLength::Bits32, true)) + } + Bgp4MpType::StateChange | Bgp4MpType::StateChangeAs4 => None, + } +} + +fn parse_bgp4mp_routes( + sub_type: u16, + mut data: Bytes, + timestamp: f64, +) -> Result { + let msg_type = Bgp4MpType::try_from(sub_type)?; + let Some((asn_len, add_path)) = bgp4mp_asn_len_and_add_path(msg_type) else { + return Ok(RouteRecordIter::Empty); + }; + + let total_size = data.len(); + let peer_asn = data.read_asn(asn_len)?; + let _local_asn = data.read_asn(asn_len)?; + let _interface_index = data.read_u16()?; + let afi = data.read_afi()?; + let peer_ip = data.read_address(&afi)?; + let _local_ip = data.read_address(&afi)?; + + let should_read = bgp4mp_message_payload_len(&afi, &asn_len, total_size); + if should_read != data.remaining() { + return Err(ParserError::TruncatedMsg(format!( + "truncated bgp4mp message: should read {} bytes, have {} bytes available", + should_read, + data.remaining() + ))); + } + + parse_bgp_message_routes(data, add_path, &asn_len, timestamp, peer_ip, peer_asn) +} + +fn table_dump_v2_afi_safi(rib_type: TableDumpV2Type) -> Result<(Afi, Safi), ParserError> { + match rib_type { + TableDumpV2Type::RibIpv4Unicast | TableDumpV2Type::RibIpv4UnicastAddPath => { + Ok((Afi::Ipv4, Safi::Unicast)) + } + TableDumpV2Type::RibIpv4Multicast | TableDumpV2Type::RibIpv4MulticastAddPath => { + Ok((Afi::Ipv4, Safi::Multicast)) + } + TableDumpV2Type::RibIpv6Unicast | TableDumpV2Type::RibIpv6UnicastAddPath => { + Ok((Afi::Ipv6, Safi::Unicast)) + } + TableDumpV2Type::RibIpv6Multicast | TableDumpV2Type::RibIpv6MulticastAddPath => { + Ok((Afi::Ipv6, Safi::Multicast)) + } + _ => Err(ParserError::ParseError(format!( + "wrong RIB type for parsing: {rib_type:?}" + ))), + } +} + +fn is_add_path_rib_type(rib_type: TableDumpV2Type) -> bool { + matches!( + rib_type, + TableDumpV2Type::RibIpv4UnicastAddPath + | TableDumpV2Type::RibIpv4MulticastAddPath + | TableDumpV2Type::RibIpv6UnicastAddPath + | TableDumpV2Type::RibIpv6MulticastAddPath + ) +} + +fn parse_table_dump_routes(sub_type: u16, mut data: Bytes) -> Result { + let afi = match sub_type { + 1 => Afi::Ipv4, + 2 => Afi::Ipv6, + _ => { + return Err(ParserError::ParseError(format!( + "Invalid subtype found for TABLE_DUMP (V1) message: {sub_type}" + ))) + } + }; + + let _view_number = data.read_u16()?; + let _sequence_number = data.read_u16()?; + let prefix = match &afi { + Afi::Ipv4 => data.read_ipv4_prefix().map(IpNet::V4), + Afi::Ipv6 => data.read_ipv6_prefix().map(IpNet::V6), + Afi::LinkState => unreachable!(), + }?; + let _status = data.read_u8()?; + let originated_time = data.read_u32()? as f64; + let peer_ip = data.read_address(&afi)?; + let peer_asn = Asn::new_16bit(data.read_u16()?); + let attribute_length = data.read_u16()? as usize; + data.has_n_remaining(attribute_length)?; + let attrs = parse_route_attributes( + data.split_to(attribute_length), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(true), + has_standard_nlri: afi == Afi::Ipv4, + }, + )?; + + Ok(RouteRecordIter::One(Some(BgpRouteElem { + timestamp: originated_time, + elem_type: ElemType::ANNOUNCE, + peer_ip, + peer_asn, + prefix: NetworkPrefix::new(prefix, None), + as_path: attrs.as_path, + }))) +} + +fn parse_table_dump_v2_routes( + sub_type: u16, + mut data: Bytes, + peer_table: &mut Option, +) -> Result { + let v2_type = TableDumpV2Type::try_from(sub_type)?; + match v2_type { + TableDumpV2Type::PeerIndexTable => { + *peer_table = Some(parse_route_peer_table(data)?); + Ok(RouteRecordIter::Empty) + } + TableDumpV2Type::GeoPeerTable => Ok(RouteRecordIter::Empty), + TableDumpV2Type::RibGeneric | TableDumpV2Type::RibGenericAddPath => Err( + ParserError::Unsupported("TableDumpV2 RibGeneric is not currently supported".into()), + ), + rib_type => { + let (afi, safi) = table_dump_v2_afi_safi(rib_type)?; + let is_add_path = is_add_path_rib_type(rib_type); + let _sequence_number = data.read_u32()?; + let prefix = data.read_nlri_prefix(&afi, false)?; + let entry_count = data.read_u16()?; + let Some(peer_table) = peer_table.clone() else { + return Err(ParserError::ParseError( + "peer table not set for TableDumpV2 RIB entries".to_string(), + )); + }; + + Ok(RouteRecordIter::RibAfi(RouteRibAfiIter { + data, + peer_table, + afi, + safi, + is_add_path, + prefix, + remaining_entries: entry_count, + })) + } + } +} + +fn parse_raw_record_route_iter( + raw_record: crate::RawMrtRecord, + peer_table: &mut Option, +) -> Result { + let timestamp = record_timestamp(&raw_record.common_header); + match raw_record.common_header.entry_type { + EntryType::TABLE_DUMP => parse_table_dump_routes( + raw_record.common_header.entry_subtype, + raw_record.message_bytes, + ), + EntryType::TABLE_DUMP_V2 => parse_table_dump_v2_routes( + raw_record.common_header.entry_subtype, + raw_record.message_bytes, + peer_table, + ), + EntryType::BGP4MP | EntryType::BGP4MP_ET => parse_bgp4mp_routes( + raw_record.common_header.entry_subtype, + raw_record.message_bytes, + timestamp, + ), + v => Err(ParserError::Unsupported(format!( + "unsupported MRT type: {v:?}" + ))), + } +} + +pub struct RouteIterator { + parser: BgpkitParser, + pending_routes: RouteRecordIter, + peer_table: Option, +} + +impl RouteIterator { + pub(crate) fn new(parser: BgpkitParser) -> Self { + Self { + parser, + pending_routes: RouteRecordIter::Empty, + peer_table: None, + } + } +} + +impl Iterator for RouteIterator { + type Item = BgpRouteElem; + + fn next(&mut self) -> Option { + loop { + match self.pending_routes.next_route() { + Ok(Some(route)) => { + if route.match_filters(&self.parser.filters) { + return Some(route); + } + continue; + } + Ok(None) => {} + Err(err) => { + error!("parser error: {}", err); + self.pending_routes = RouteRecordIter::Empty; + if self.parser.core_dump { + return None; + } + continue; + } + } + + let raw_record = match chunk_mrt_record(&mut self.parser.reader) { + Ok(raw_record) => raw_record, + Err(e) => match e.error { + ParserError::TruncatedMsg(err_str) | ParserError::Unsupported(err_str) => { + if self.parser.options.show_warnings { + warn!("parser warn: {}", err_str); + } + write_mrt_core_dump(self.parser.core_dump, e.bytes); + continue; + } + ParserError::ParseError(err_str) => { + error!("parser error: {}", err_str); + if self.parser.core_dump { + write_mrt_core_dump(true, e.bytes); + return None; + } + continue; + } + ParserError::EofExpected => return None, + ParserError::IoError(err) | ParserError::EofError(err) => { + error!("{:?}", err); + write_mrt_core_dump(self.parser.core_dump, e.bytes); + return None; + } + #[cfg(feature = "oneio")] + ParserError::OneIoError(_) => return None, + ParserError::FilterError(_) => return None, + ParserError::InvalidLabeledNlriLength + | ParserError::TruncatedLabeledNlri + | ParserError::TruncatedPrefix + | ParserError::MaxLabelStackDepthExceeded + | ParserError::PeerMaxLabelsExceeded + | ParserError::InvalidPrefix => { + if self.parser.options.show_warnings { + warn!("parser warn: labeled NLRI parsing error: {:?}", e.error); + } + continue; + } + }, + }; + + match parse_raw_record_route_iter(raw_record, &mut self.peer_table) { + Ok(routes) => { + self.pending_routes = routes; + } + Err(err) => { + error!("parser error: {}", err); + if self.parser.core_dump { + return None; + } + continue; + } + } + } + } +} + +pub struct FallibleRouteIterator { + parser: BgpkitParser, + pending_routes: RouteRecordIter, + peer_table: Option, +} + +impl FallibleRouteIterator { + pub(crate) fn new(parser: BgpkitParser) -> Self { + Self { + parser, + pending_routes: RouteRecordIter::Empty, + peer_table: None, + } + } +} + +impl Iterator for FallibleRouteIterator { + type Item = Result; + + fn next(&mut self) -> Option { + loop { + match self.pending_routes.next_route() { + Ok(Some(route)) => { + if route.match_filters(&self.parser.filters) { + return Some(Ok(route)); + } + continue; + } + Ok(None) => {} + Err(error) => { + self.pending_routes = RouteRecordIter::Empty; + return Some(Err(ParserErrorWithBytes { error, bytes: None })); + } + } + + let raw_record = match chunk_mrt_record(&mut self.parser.reader) { + Ok(raw_record) => raw_record, + Err(e) if matches!(e.error, ParserError::EofExpected) => return None, + Err(e) => return Some(Err(e)), + }; + + match parse_raw_record_route_iter(raw_record, &mut self.peer_table) { + Ok(routes) => { + self.pending_routes = routes; + } + Err(error) => return Some(Err(ParserErrorWithBytes { error, bytes: None })), + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::parser::iters::write_mrt_core_dump_to_path; + use bytes::{BufMut, BytesMut}; + use std::io::Cursor; + use std::net::{Ipv4Addr, Ipv6Addr}; + use std::str::FromStr; + + fn route_projection(elem: BgpElem) -> BgpRouteElem { + BgpRouteElem { + timestamp: elem.timestamp, + elem_type: elem.elem_type, + peer_ip: elem.peer_ip, + peer_asn: elem.peer_asn, + prefix: elem.prefix, + as_path: elem.as_path.map(Arc::new), + } + } + + fn collect_route_record_iter( + mut iter: RouteRecordIter, + ) -> Result, ParserError> { + let mut routes = Vec::new(); + while let Some(route) = iter.next_route()? { + routes.push(route); + } + Ok(routes) + } + + fn route_peer_table_from_peer_index(peer_table: PeerIndexTable) -> RoutePeerTable { + let mut peer_ids = peer_table.id_peer_map.keys().copied().collect::>(); + peer_ids.sort_unstable(); + let peers = peer_ids + .into_iter() + .map(|peer_id| peer_table.id_peer_map[&peer_id]) + .collect::>(); + + RoutePeerTable { + peers: Arc::from(peers), + } + } + + fn update_record() -> MrtRecord { + let mut attributes = Attributes::default(); + attributes.add_attr(AttributeValue::Origin(Origin::IGP).into()); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([64500, 64501]), + is_as4: false, + } + .into(), + ); + attributes + .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into()); + + MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::BGP4MP, + entry_subtype: Bgp4MpType::MessageAs4 as u16, + length: 0, + }, + message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage { + msg_type: Bgp4MpType::MessageAs4, + peer_asn: Asn::new_32bit(64496), + local_asn: Asn::new_32bit(64497), + interface_index: 0, + peer_ip: IpAddr::from_str("192.0.2.1").unwrap(), + local_ip: IpAddr::from_str("192.0.2.2").unwrap(), + bgp_message: BgpMessage::Update(BgpUpdateMessage { + withdrawn_prefixes: vec![NetworkPrefix::from_str("198.51.100.0/24").unwrap()], + attributes, + announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()], + }), + })), + } + } + + fn route_attributes(as_path: impl AsRef<[u32]>) -> Attributes { + let mut attributes = Attributes::default(); + attributes.add_attr(AttributeValue::Origin(Origin::IGP).into()); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence(as_path), + is_as4: false, + } + .into(), + ); + attributes + .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into()); + attributes + } + + fn bgp4mp_record(msg_type: Bgp4MpType, bgp_message: BgpMessage) -> MrtRecord { + let asn = if matches!( + msg_type, + Bgp4MpType::Message + | Bgp4MpType::MessageLocal + | Bgp4MpType::MessageAddpath + | Bgp4MpType::MessageLocalAddpath + ) { + Asn::new_16bit(64496) + } else { + Asn::new_32bit(64496) + }; + + MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::BGP4MP, + entry_subtype: msg_type as u16, + length: 0, + }, + message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage { + msg_type, + peer_asn: asn, + local_asn: Asn::new_32bit(64497), + interface_index: 0, + peer_ip: IpAddr::from_str("192.0.2.1").unwrap(), + local_ip: IpAddr::from_str("192.0.2.2").unwrap(), + bgp_message, + })), + } + } + + fn open_message() -> BgpMessage { + BgpMessage::Open(BgpOpenMessage { + version: 4, + asn: Asn::new_16bit(64496), + hold_time: 180, + bgp_identifier: Ipv4Addr::new(192, 0, 2, 1), + extended_length: false, + opt_params: vec![], + }) + } + + fn raw_bgp_message(length: u16, msg_type: BgpMessageType, payload: &[u8]) -> Bytes { + raw_bgp_message_with_marker([0xff; 16], length, msg_type, payload) + } + + fn raw_bgp_message_with_marker( + marker: [u8; 16], + length: u16, + msg_type: BgpMessageType, + payload: &[u8], + ) -> Bytes { + let mut bytes = BytesMut::new(); + bytes.put_slice(&marker); + bytes.put_u16(length); + bytes.put_u8(msg_type as u8); + bytes.put_slice(payload); + bytes.freeze() + } + + fn table_dump_record() -> MrtRecord { + let mut attributes = Attributes::default(); + attributes.add_attr(AttributeValue::Origin(Origin::IGP).into()); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([64500, 64501]), + is_as4: false, + } + .into(), + ); + attributes + .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into()); + + MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP, + entry_subtype: 1, + length: 0, + }, + message: MrtMessage::TableDumpMessage(TableDumpMessage { + view_number: 0, + sequence_number: 1, + prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(), + status: 1, + originated_time: 1_699_999_998, + peer_ip: IpAddr::from_str("192.0.2.20").unwrap(), + peer_asn: Asn::new_16bit(64496), + attributes, + }), + } + } + + fn table_dump_ipv6_record() -> MrtRecord { + let mut attributes = Attributes::default(); + attributes.add_attr(AttributeValue::Origin(Origin::IGP).into()); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([64500, 64501]), + is_as4: false, + } + .into(), + ); + + MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP, + entry_subtype: 2, + length: 0, + }, + message: MrtMessage::TableDumpMessage(TableDumpMessage { + view_number: 0, + sequence_number: 1, + prefix: NetworkPrefix::from_str("2001:db8::/32").unwrap(), + status: 1, + originated_time: 1_699_999_998, + peer_ip: IpAddr::from_str("2001:db8::20").unwrap(), + peer_asn: Asn::new_16bit(64496), + attributes, + }), + } + } + + fn table_dump_v2_records_bytes() -> Vec { + let peer = Peer::new( + "192.0.2.10".parse().unwrap(), + "192.0.2.11".parse().unwrap(), + Asn::new_32bit(64496), + ); + let mut peer_table = PeerIndexTable::default(); + let peer_index = peer_table.add_peer(peer); + + let mut attributes = Attributes::default(); + attributes.add_attr(AttributeValue::Origin(Origin::IGP).into()); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([64500, 64501]), + is_as4: false, + } + .into(), + ); + attributes + .add_attr(AttributeValue::NextHop(IpAddr::from_str("192.0.2.254").unwrap()).into()); + + let pit_record = MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::PeerIndexTable as u16, + length: 0, + }, + message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(peer_table)), + }; + let rib_record = MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_001, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16, + length: 0, + }, + message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries { + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 1, + prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(), + rib_entries: vec![RibEntry { + peer_index, + originated_time: 1_699_999_999, + path_id: None, + attributes, + }], + })), + }; + + let mut bytes = pit_record.encode().to_vec(); + bytes.extend_from_slice(&rib_record.encode()); + bytes + } + + fn table_dump_v2_truncated_attribute_payload() -> (Vec, Bytes, PeerIndexTable) { + let peer = Peer::new( + "192.0.2.10".parse().unwrap(), + "192.0.2.11".parse().unwrap(), + Asn::new_32bit(64496), + ); + let mut peer_table = PeerIndexTable::default(); + let peer_index = peer_table.add_peer(peer); + + let pit_record = MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::PeerIndexTable as u16, + length: 0, + }, + message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable( + peer_table.clone(), + )), + }; + + let first_entry = RibEntry { + peer_index, + originated_time: 1_699_999_999, + path_id: None, + attributes: route_attributes([64500, 64501]), + }; + + let mut rib_body = BytesMut::new(); + rib_body.put_u32(1); + rib_body.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode()); + rib_body.put_u16(2); + rib_body.extend(first_entry.encode()); + rib_body.put_u16(peer_index); + rib_body.put_u32(1_699_999_998); + rib_body.put_u16(32); + rib_body.put_u8(0); + + let rib_body = rib_body.freeze(); + let rib_header = CommonHeader { + timestamp: 1_700_000_001, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16, + length: rib_body.len() as u32, + }; + + let mut bytes = pit_record.encode().to_vec(); + bytes.extend_from_slice(&rib_header.encode()); + bytes.extend_from_slice(&rib_body); + + (bytes, rib_body, peer_table) + } + + fn assert_filtered_route_projection(bytes: Vec, filters: &[(&str, &str)]) { + let elem_parser = filters.iter().fold( + BgpkitParser::from_reader(Cursor::new(bytes.clone())), + |parser, (filter_type, filter_value)| { + parser.add_filter(filter_type, filter_value).unwrap() + }, + ); + let route_parser = filters.iter().fold( + BgpkitParser::from_reader(Cursor::new(bytes)), + |parser, (filter_type, filter_value)| { + parser.add_filter(filter_type, filter_value).unwrap() + }, + ); + + let elem_projection = elem_parser + .into_elem_iter() + .map(route_projection) + .collect::>(); + let routes = route_parser.into_route_iter().collect::>(); + + assert_eq!(routes, elem_projection, "filters: {filters:?}"); + } + + fn assert_route_projection(bytes: Vec) -> Vec { + let elem_projection = BgpkitParser::from_reader(Cursor::new(bytes.clone())) + .into_elem_iter() + .map(route_projection) + .collect::>(); + let routes = BgpkitParser::from_reader(Cursor::new(bytes)) + .into_route_iter() + .collect::>(); + + assert_eq!(routes, elem_projection); + routes + } + + #[test] + fn route_iterator_matches_elem_projection_for_update() { + let bytes = update_record().encode().to_vec(); + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 2); + assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE); + assert_eq!(routes[1].elem_type, ElemType::WITHDRAW); + assert!(routes[1].as_path.is_none()); + } + + #[test] + fn route_iterator_shares_as_path_for_update_announcements() { + let bytes = bgp4mp_record( + Bgp4MpType::MessageAs4, + BgpMessage::Update(BgpUpdateMessage { + withdrawn_prefixes: vec![], + attributes: route_attributes([64500, 64501]), + announced_prefixes: vec![ + NetworkPrefix::from_str("203.0.113.0/24").unwrap(), + NetworkPrefix::from_str("198.51.100.0/24").unwrap(), + ], + }), + ) + .encode() + .to_vec(); + + let routes = BgpkitParser::from_reader(Cursor::new(bytes)) + .into_route_iter() + .collect::>(); + + assert_eq!(routes.len(), 2); + assert!(Arc::ptr_eq( + routes[0].as_path.as_ref().unwrap(), + routes[1].as_path.as_ref().unwrap() + )); + } + + #[test] + fn route_iterator_uses_microsecond_timestamps() { + let timestamp = record_timestamp(&CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: Some(123_456), + entry_type: EntryType::BGP4MP_ET, + entry_subtype: Bgp4MpType::MessageAs4 as u16, + length: 0, + }); + + assert_eq!(timestamp, 1_700_000_000.123_456); + } + + #[test] + fn route_iterator_matches_elem_projection_for_mp_update() { + let mut attributes = route_attributes([64500, 64501]); + attributes.add_attr( + AttributeValue::MpReachNlri(Nlri::new_reachable( + NetworkPrefix::from_str("2001:db8::/32").unwrap(), + Some(IpAddr::from_str("2001:db8::1").unwrap()), + )) + .into(), + ); + attributes.add_attr( + AttributeValue::MpUnreachNlri(Nlri::new_unreachable( + NetworkPrefix::from_str("2001:db8:1::/48").unwrap(), + )) + .into(), + ); + + let bytes = bgp4mp_record( + Bgp4MpType::MessageAs4, + BgpMessage::Update(BgpUpdateMessage { + withdrawn_prefixes: vec![], + attributes, + announced_prefixes: vec![], + }), + ) + .encode() + .to_vec(); + + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 2); + assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE); + assert_eq!( + routes[0].prefix, + NetworkPrefix::from_str("2001:db8::/32").unwrap() + ); + assert_eq!(routes[1].elem_type, ElemType::WITHDRAW); + assert_eq!( + routes[1].prefix, + NetworkPrefix::from_str("2001:db8:1::/48").unwrap() + ); + } + + #[test] + fn route_iterator_matches_elem_projection_for_non_update_bgp4mp_messages() { + let records = [ + bgp4mp_record(Bgp4MpType::Message, open_message()), + bgp4mp_record( + Bgp4MpType::MessageAs4, + BgpMessage::Notification(BgpNotificationMessage { + error: BgpError::Unknown(1, 0), + data: vec![], + }), + ), + bgp4mp_record(Bgp4MpType::MessageAddpath, BgpMessage::KeepAlive), + bgp4mp_record(Bgp4MpType::MessageAs4Addpath, BgpMessage::KeepAlive), + ]; + let mut bytes = Vec::new(); + for record in records { + bytes.extend_from_slice(&record.encode()); + } + + assert!(assert_route_projection(bytes).is_empty()); + } + + #[test] + fn route_iterator_matches_elem_projection_for_bgp4mp_16bit_update() { + let bytes = bgp4mp_record( + Bgp4MpType::Message, + BgpMessage::Update(BgpUpdateMessage { + withdrawn_prefixes: vec![], + attributes: route_attributes([64500, 64501]), + announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()], + }), + ) + .encode() + .to_vec(); + + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 1); + assert_eq!(routes[0].peer_asn, Asn::new_16bit(64496)); + } + + #[test] + fn route_iterator_filters_match_elem_projection_for_update() { + let bytes = update_record().encode().to_vec(); + let cases: &[&[(&str, &str)]] = &[ + &[("peer_ip", "192.0.2.1")], + &[("peer_ip", "192.0.2.99")], + &[("peer_asn", "64496")], + &[("type", "a")], + &[("type", "w")], + &[("type", "!w")], + &[("prefix", "203.0.113.0/24")], + &[("prefix", "198.51.100.0/24")], + &[("prefix_super", "203.0.113.128/25")], + &[("origin_asn", "64501")], + &[("origin_asns", "64496,64501")], + &[("as_path", "64500 64501$")], + &[("ip_version", "4")], + &[("ts_start", "1700000000"), ("ts_end", "1700000000")], + &[("peer_ip", "192.0.2.1"), ("type", "a")], + ]; + + for filters in cases { + assert_filtered_route_projection(bytes.clone(), filters); + } + } + + #[test] + fn selective_attribute_parser_merges_as4_path() { + let mut attributes = Attributes::default(); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([23456, 64497]), + is_as4: false, + } + .into(), + ); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([65536, 64497]), + is_as4: true, + } + .into(), + ); + + let attrs = parse_route_attributes( + attributes.encode(AsnLength::Bits16), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(true), + has_standard_nlri: true, + }, + ) + .unwrap(); + + assert_eq!( + attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(), + vec![65536, 64497] + ); + } + + #[test] + fn selective_attribute_parser_handles_as_path_without_as4_path() { + let attrs = parse_route_attributes( + route_attributes([64500, 64501]).encode(AsnLength::Bits16), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(true), + has_standard_nlri: true, + }, + ) + .unwrap(); + + assert_eq!( + attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(), + vec![64500, 64501] + ); + } + + #[test] + fn selective_attribute_parser_handles_as4_path_without_as_path() { + let mut attributes = Attributes::default(); + attributes.add_attr( + AttributeValue::AsPath { + path: AsPath::from_sequence([65536, 64497]), + is_as4: true, + } + .into(), + ); + + let attrs = parse_route_attributes( + attributes.encode(AsnLength::Bits16), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(false), + has_standard_nlri: false, + }, + ) + .unwrap(); + + assert_eq!( + attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(), + vec![65536, 64497] + ); + } + + #[test] + fn selective_attribute_parser_handles_no_as_path() { + let attrs = parse_route_attributes( + Bytes::new(), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(false), + has_standard_nlri: false, + }, + ) + .unwrap(); + + assert!(attrs.as_path.is_none()); + } + + #[test] + fn selective_attribute_parser_handles_extended_and_truncated_attributes() { + let mut extended_as_path = BytesMut::new(); + extended_as_path.put_u8((AttrFlags::TRANSITIVE | AttrFlags::EXTENDED).bits()); + extended_as_path.put_u8(u8::from(AttrType::AS_PATH)); + extended_as_path.put_u16(4); + extended_as_path.put_u8(2); + extended_as_path.put_u8(1); + extended_as_path.put_u16(64500); + + let attrs = parse_route_attributes( + extended_as_path.freeze(), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(false), + has_standard_nlri: false, + }, + ) + .unwrap(); + assert_eq!( + attrs.as_path.unwrap().to_u32_vec_opt(false).unwrap(), + vec![64500] + ); + + let attrs = parse_route_attributes( + Bytes::from_static(&[0x40, 2, 5, 0]), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(false), + has_standard_nlri: false, + }, + ) + .unwrap(); + assert!(attrs.as_path.is_none()); + } + + #[test] + fn selective_attribute_parser_discards_malformed_as_path() { + let attrs = parse_route_attributes( + Bytes::from_static(&[0x40, 2, 1, 0]), + &AsnLength::Bits16, + false, + RouteAttributeContext { + afi: None, + safi: None, + prefixes: None, + is_announcement: Some(false), + has_standard_nlri: false, + }, + ) + .unwrap(); + + assert!(attrs.as_path.is_none()); + } + + #[test] + fn route_iterator_matches_elem_projection_for_table_dump() { + let bytes = table_dump_record().encode().to_vec(); + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 1); + assert_eq!(routes[0].timestamp, 1_699_999_998.0); + assert_eq!(routes[0].peer_asn, Asn::new_16bit(64496)); + } + + #[test] + fn route_iterator_matches_elem_projection_for_table_dump_ipv6() { + let bytes = table_dump_ipv6_record().encode().to_vec(); + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 1); + assert_eq!( + routes[0].prefix, + NetworkPrefix::from_str("2001:db8::/32").unwrap() + ); + assert_eq!( + routes[0].peer_ip, + IpAddr::from(Ipv6Addr::from_str("2001:db8::20").unwrap()) + ); + } + + #[test] + fn route_iterator_matches_elem_projection_for_table_dump_v2() { + let bytes = table_dump_v2_records_bytes(); + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 1); + assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE); + assert_eq!( + routes[0].as_path.as_ref().unwrap().to_u32_vec_opt(false), + Some(vec![64500, 64501]) + ); + } + + #[test] + fn route_iterator_matches_elem_projection_for_table_dump_v2_ipv6_addpath() { + let peer = Peer::new( + "192.0.2.11".parse().unwrap(), + "2001:db8::10".parse().unwrap(), + Asn::new_32bit(64496), + ); + let mut peer_table = PeerIndexTable::default(); + let peer_index = peer_table.add_peer(peer); + + let pit_record = MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::PeerIndexTable as u16, + length: 0, + }, + message: MrtMessage::TableDumpV2Message(TableDumpV2Message::PeerIndexTable(peer_table)), + }; + let rib_record = MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_001, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::RibIpv6UnicastAddPath as u16, + length: 0, + }, + message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries { + rib_type: TableDumpV2Type::RibIpv6UnicastAddPath, + sequence_number: 1, + prefix: NetworkPrefix::from_str("2001:db8::/32").unwrap(), + rib_entries: vec![RibEntry { + peer_index, + originated_time: 1_699_999_999, + path_id: Some(1234), + attributes: route_attributes([64500, 64501]), + }], + })), + }; + + let mut bytes = pit_record.encode().to_vec(); + bytes.extend_from_slice(&rib_record.encode()); + let routes = assert_route_projection(bytes); + assert_eq!(routes.len(), 1); + assert_eq!( + routes[0].prefix, + NetworkPrefix::from_str("2001:db8::/32").unwrap() + ); + } + + #[test] + fn route_iterator_matches_elem_projection_for_bgp4mp_ipv6_peer_update() { + let record = MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_000, + microsecond_timestamp: None, + entry_type: EntryType::BGP4MP, + entry_subtype: Bgp4MpType::MessageAs4 as u16, + length: 0, + }, + message: MrtMessage::Bgp4Mp(Bgp4MpEnum::Message(Bgp4MpMessage { + msg_type: Bgp4MpType::MessageAs4, + peer_asn: Asn::new_32bit(64496), + local_asn: Asn::new_32bit(64497), + interface_index: 0, + peer_ip: IpAddr::from_str("2001:db8::1").unwrap(), + local_ip: IpAddr::from_str("2001:db8::2").unwrap(), + bgp_message: BgpMessage::Update(BgpUpdateMessage { + withdrawn_prefixes: vec![], + attributes: route_attributes([64500, 64501]), + announced_prefixes: vec![NetworkPrefix::from_str("203.0.113.0/24").unwrap()], + }), + })), + }; + + let routes = assert_route_projection(record.encode().to_vec()); + assert_eq!(routes.len(), 1); + assert_eq!( + routes[0].peer_ip, + IpAddr::from(Ipv6Addr::from_str("2001:db8::1").unwrap()) + ); + } + + #[test] + fn route_iterator_filters_match_elem_projection_for_table_dump_v2() { + let bytes = table_dump_v2_records_bytes(); + let cases: &[&[(&str, &str)]] = &[ + &[("peer_ip", "192.0.2.10")], + &[("peer_asn", "64496")], + &[("type", "a")], + &[("type", "w")], + &[("prefix", "203.0.113.0/24")], + &[("prefix_sub", "203.0.112.0/23")], + &[("origin_asn", "64501")], + &[("as_path", "64500 64501$")], + &[("ts_start", "1699999999"), ("ts_end", "1699999999")], + &[("peer_asn", "64496"), ("origin_asn", "64501")], + ]; + + for filters in cases { + assert_filtered_route_projection(bytes.clone(), filters); + } + } + + #[test] + fn route_parser_reports_bgp_message_shape_errors() { + assert!(parse_bgp_message_routes( + raw_bgp_message(18, BgpMessageType::KEEPALIVE, &[]), + false, + &AsnLength::Bits16, + 1_700_000_000.0, + "192.0.2.1".parse().unwrap(), + Asn::new_16bit(64496) + ) + .is_err()); + assert!(parse_bgp_message_routes( + raw_bgp_message(4097, BgpMessageType::OPEN, &[]), + false, + &AsnLength::Bits16, + 1_700_000_000.0, + "192.0.2.1".parse().unwrap(), + Asn::new_16bit(64496) + ) + .is_err()); + + let routes = collect_route_record_iter( + parse_bgp_message_routes( + raw_bgp_message(30, BgpMessageType::KEEPALIVE, &[]), + false, + &AsnLength::Bits16, + 1_700_000_000.0, + "192.0.2.1".parse().unwrap(), + Asn::new_16bit(64496), + ) + .unwrap(), + ) + .unwrap(); + assert!(routes.is_empty()); + + let routes = collect_route_record_iter( + parse_bgp_message_routes( + raw_bgp_message(19, BgpMessageType::KEEPALIVE, &[0]), + false, + &AsnLength::Bits16, + 1_700_000_000.0, + "192.0.2.1".parse().unwrap(), + Asn::new_16bit(64496), + ) + .unwrap(), + ) + .unwrap(); + assert!(routes.is_empty()); + + let routes = collect_route_record_iter( + parse_bgp_message_routes( + raw_bgp_message_with_marker([0x00; 16], 19, BgpMessageType::KEEPALIVE, &[]), + false, + &AsnLength::Bits16, + 1_700_000_000.0, + "192.0.2.1".parse().unwrap(), + Asn::new_16bit(64496), + ) + .unwrap(), + ) + .unwrap(); + assert!(routes.is_empty()); + } + + #[test] + fn route_core_dump_write_respects_enabled_flag() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("mrt_core_dump"); + + write_mrt_core_dump_to_path(false, Some(vec![1, 2, 3]), &path); + assert!(!path.exists()); + + write_mrt_core_dump_to_path(true, Some(vec![1, 2, 3]), &path); + assert_eq!(std::fs::read(&path).unwrap(), vec![1, 2, 3]); + } + + #[test] + fn route_parser_handles_table_dump_v2_error_edges() { + let rib = RibAfiEntries { + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 1, + prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(), + rib_entries: vec![RibEntry { + peer_index: 99, + originated_time: 1_699_999_999, + path_id: None, + attributes: route_attributes([64500, 64501]), + }], + }; + let mut no_peer_table = None; + assert!(parse_table_dump_v2_routes( + TableDumpV2Type::RibIpv4Unicast as u16, + rib.encode(), + &mut no_peer_table, + ) + .is_err()); + + let mut empty_peer_table = Some(RoutePeerTable::default()); + let routes = collect_route_record_iter( + parse_table_dump_v2_routes( + TableDumpV2Type::RibIpv4Unicast as u16, + rib.encode(), + &mut empty_peer_table, + ) + .unwrap(), + ) + .unwrap(); + assert!(routes.is_empty()); + + let mut truncated = BytesMut::new(); + truncated.put_u32(1); + truncated.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode()); + truncated.put_u16(1); + let mut empty_peer_table = Some(RoutePeerTable::default()); + let routes = collect_route_record_iter( + parse_table_dump_v2_routes( + TableDumpV2Type::RibIpv4Unicast as u16, + truncated.freeze(), + &mut empty_peer_table, + ) + .unwrap(), + ) + .unwrap(); + assert!(routes.is_empty()); + + let peer = Peer::new( + "192.0.2.10".parse().unwrap(), + "192.0.2.11".parse().unwrap(), + Asn::new_32bit(64496), + ); + let mut peer_table = PeerIndexTable::default(); + let peer_index = peer_table.add_peer(peer); + + let first_entry = RibEntry { + peer_index, + originated_time: 1_699_999_999, + path_id: Some(1234), + attributes: route_attributes([64500, 64501]), + }; + let mut add_path_truncated = BytesMut::new(); + add_path_truncated.put_u32(1); + add_path_truncated.extend(NetworkPrefix::from_str("203.0.113.0/24").unwrap().encode()); + add_path_truncated.put_u16(2); + add_path_truncated.extend(first_entry.encode()); + add_path_truncated.put_u16(peer_index); + add_path_truncated.put_u32(1_699_999_998); + add_path_truncated.put_u32(5678); + + let mut peer_table = Some(route_peer_table_from_peer_index(peer_table)); + let routes = collect_route_record_iter( + parse_table_dump_v2_routes( + TableDumpV2Type::RibIpv4UnicastAddPath as u16, + add_path_truncated.freeze(), + &mut peer_table, + ) + .unwrap(), + ) + .unwrap(); + assert_eq!(routes.len(), 1); + assert_eq!( + routes[0].prefix, + NetworkPrefix::from_str("203.0.113.0/24").unwrap() + ); + } + + #[test] + fn route_parser_preserves_table_dump_v2_routes_before_truncated_attribute_payload() { + let (_bytes, rib_body, peer_table) = table_dump_v2_truncated_attribute_payload(); + let mut peer_table = Some(route_peer_table_from_peer_index(peer_table)); + + let routes = collect_route_record_iter( + parse_table_dump_v2_routes( + TableDumpV2Type::RibIpv4Unicast as u16, + rib_body, + &mut peer_table, + ) + .unwrap(), + ) + .unwrap(); + + assert_eq!(routes.len(), 1); + assert_eq!( + routes[0].prefix, + NetworkPrefix::from_str("203.0.113.0/24").unwrap() + ); + assert_eq!( + routes[0].as_path.as_ref().unwrap().to_u32_vec_opt(false), + Some(vec![64500, 64501]) + ); + } + + #[test] + fn route_iterators_preserve_table_dump_v2_routes_before_truncated_attribute_payload() { + let (bytes, _rib_body, _peer_table) = table_dump_v2_truncated_attribute_payload(); + + let routes = BgpkitParser::from_reader(Cursor::new(bytes.clone())) + .into_route_iter() + .collect::>(); + assert_eq!(routes.len(), 1); + assert_eq!( + routes[0].prefix, + NetworkPrefix::from_str("203.0.113.0/24").unwrap() + ); + + let fallible_routes = BgpkitParser::from_reader(Cursor::new(bytes)) + .into_fallible_route_iter() + .collect::, _>>() + .unwrap(); + assert_eq!(fallible_routes, routes); + } + + fn table_dump_v2_rib_without_peer_table_record() -> MrtRecord { + MrtRecord { + common_header: CommonHeader { + timestamp: 1_700_000_001, + microsecond_timestamp: None, + entry_type: EntryType::TABLE_DUMP_V2, + entry_subtype: TableDumpV2Type::RibIpv4Unicast as u16, + length: 0, + }, + message: MrtMessage::TableDumpV2Message(TableDumpV2Message::RibAfi(RibAfiEntries { + rib_type: TableDumpV2Type::RibIpv4Unicast, + sequence_number: 1, + prefix: NetworkPrefix::from_str("203.0.113.0/24").unwrap(), + rib_entries: vec![RibEntry { + peer_index: 0, + originated_time: 1_699_999_999, + path_id: None, + attributes: route_attributes([64500, 64501]), + }], + })), + } + } + + #[test] + fn route_iterator_skips_route_parse_errors() { + let routes = BgpkitParser::from_reader(Cursor::new( + table_dump_v2_rib_without_peer_table_record() + .encode() + .to_vec(), + )) + .into_route_iter() + .collect::>(); + + assert!(routes.is_empty()); + } + + #[test] + fn fallible_route_iterator_applies_filters_to_cached_routes() { + let routes = BgpkitParser::from_reader(Cursor::new(update_record().encode().to_vec())) + .add_filter("type", "w") + .unwrap() + .into_fallible_route_iter() + .collect::, _>>() + .unwrap(); + + assert_eq!(routes.len(), 1); + assert_eq!(routes[0].elem_type, ElemType::WITHDRAW); + } + + #[test] + fn fallible_route_iterator_returns_route_parse_errors() { + let mut iter = BgpkitParser::from_reader(Cursor::new( + table_dump_v2_rib_without_peer_table_record() + .encode() + .to_vec(), + )) + .into_fallible_route_iter(); + + assert!(iter.next().unwrap().is_err()); + } + + #[test] + fn fallible_route_iterator_yields_routes() { + let bytes = update_record().encode().to_vec(); + let routes = BgpkitParser::from_reader(Cursor::new(bytes)) + .into_fallible_route_iter() + .collect::, _>>() + .unwrap(); + + assert_eq!(routes.len(), 2); + assert_eq!(routes[0].elem_type, ElemType::ANNOUNCE); + assert_eq!(routes[1].elem_type, ElemType::WITHDRAW); + } + + #[test] + fn fallible_route_iterator_returns_parse_errors() { + let invalid_data = vec![ + 0x00, 0x00, 0x00, 0x00, // timestamp + 0xFF, 0xFF, // invalid type + 0x00, 0x00, // subtype + 0x00, 0x00, 0x00, 0x04, // length + 0x00, 0x00, 0x00, 0x00, // dummy data + ]; + + let mut iter = + BgpkitParser::from_reader(Cursor::new(invalid_data)).into_fallible_route_iter(); + + assert!(iter.next().unwrap().is_err()); + } +} diff --git a/src/parser/mrt/messages/bgp4mp.rs b/src/parser/mrt/messages/bgp4mp.rs index 0a4e517..8986628 100644 --- a/src/parser/mrt/messages/bgp4mp.rs +++ b/src/parser/mrt/messages/bgp4mp.rs @@ -39,7 +39,58 @@ pub fn parse_bgp4mp(sub_type: u16, input: Bytes) -> Result usize { +/// Return the embedded BGP message length in a BGP4MP message body. +/// +/// The BGP4MP envelope is defined by RFC 6396 Section 4.4.2 for 16-bit ASNs +/// and Section 4.4.3 for AS4 variants: +/// +/// +/// +/// RFC 8050 Section 3 defines the ADDPATH BGP4MP subtypes that reuse the same +/// envelope before the encapsulated BGP message: +/// +/// +/// `total_size` is the MRT message body length. Subtracting the peer/local ASNs, +/// interface index, AFI, and peer/local IP addresses leaves the encapsulated BGP +/// message length. +/* +4.4.2. BGP4MP_MESSAGE Subtype: + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Peer AS Number | Local AS Number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Interface Index | Address Family | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Peer IP Address (variable) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Local IP Address (variable) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | BGP Message... (variable) + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + +4.4.3. BGP4MP_MESSAGE_AS4 Subtype + 0 1 2 3 + 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Peer AS Number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Local AS Number | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Interface Index | Address Family | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Peer IP Address (variable) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | Local IP Address (variable) | + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ + | BGP Message... (variable) + +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+ +*/ +pub(crate) fn bgp4mp_message_payload_len( + afi: &Afi, + asn_len: &AsnLength, + total_size: usize, +) -> usize { let ip_size = match afi { Afi::Ipv4 => 4 * 2, Afi::Ipv6 => 16 * 2, @@ -81,7 +132,7 @@ pub fn parse_bgp4mp_message( let peer_ip = data.read_address(&afi)?; let local_ip = data.read_address(&afi)?; - let should_read = total_should_read(&afi, &asn_len, total_size); + let should_read = bgp4mp_message_payload_len(&afi, &asn_len, total_size); if should_read != data.remaining() { return Err(ParserError::TruncatedMsg(format!( "truncated bgp4mp message: should read {} bytes, have {} bytes available", diff --git a/src/parser/mrt/messages/table_dump_v2/mod.rs b/src/parser/mrt/messages/table_dump_v2/mod.rs index d2ccef0..e39b292 100644 --- a/src/parser/mrt/messages/table_dump_v2/mod.rs +++ b/src/parser/mrt/messages/table_dump_v2/mod.rs @@ -6,6 +6,7 @@ use crate::error::ParserError; use crate::messages::table_dump_v2::geo_peer_table::parse_geo_peer_table; use crate::messages::table_dump_v2::peer_index_table::parse_peer_index_table; use crate::messages::table_dump_v2::rib_afi_entries::parse_rib_afi_entries; +pub(crate) use crate::messages::table_dump_v2::rib_afi_entries::rib_entry_min_len; use crate::models::*; #[cfg(test)] use bytes::BufMut; diff --git a/src/parser/mrt/messages/table_dump_v2/rib_afi_entries.rs b/src/parser/mrt/messages/table_dump_v2/rib_afi_entries.rs index 59e5c55..02a354f 100644 --- a/src/parser/mrt/messages/table_dump_v2/rib_afi_entries.rs +++ b/src/parser/mrt/messages/table_dump_v2/rib_afi_entries.rs @@ -47,6 +47,10 @@ fn is_add_path_rib_type(rib_type: TableDumpV2Type) -> bool { ) } +pub(crate) const fn rib_entry_min_len(is_add_path: bool) -> usize { + 2 /*peer_index*/ + 4 /*time*/ + 2 /*attr_len*/ + if is_add_path { 4 } else { 0 } +} + /// RIB AFI-specific entries /// /// https://tools.ietf.org/html/rfc6396#section-4.3 @@ -65,8 +69,7 @@ pub fn parse_rib_afi_entries( let entry_count = data.read_u16()?; // Pre-allocate cautiously to avoid overflow/OOM with malformed inputs - let min_entry_size = - 2 /*peer_index*/ + 4 /*time*/ + 2 /*attr_len*/ + if is_add_path { 4 } else { 0 }; + let min_entry_size = rib_entry_min_len(is_add_path); let max_possible = data.remaining() / min_entry_size; let reserve = (entry_count as usize).min(max_possible).saturating_mul(2); let mut rib_entries = Vec::with_capacity(reserve);