From 821663459603674f23dc04fa97b40050aa61d6c5 Mon Sep 17 00:00:00 2001 From: Mingwei Zhang Date: Tue, 9 Dec 2025 12:38:12 -0800 Subject: [PATCH] refactor: move CLI argument definitions to command submodules - Move CLI argument structs from main monocle.rs to individual command modules - Each command module now contains its own Args struct using clap::Args - Reduces main file from ~1600 lines to ~140 lines - Improves code organization and maintainability - No functional changes to CLI behavior --- CHANGELOG.md | 4 + Cargo.lock | 26 +- Cargo.toml | 3 +- src/bin/commands/broker.rs | 144 +++ src/bin/commands/country.rs | 22 + src/bin/commands/ip.rs | 44 + src/bin/commands/mod.rs | 31 + src/bin/commands/parse.rs | 109 +++ src/bin/commands/pfx2as.rs | 95 ++ src/bin/commands/radar.rs | 223 +++++ src/bin/commands/rpki.rs | 326 +++++++ src/bin/commands/search.rs | 601 +++++++++++++ src/bin/commands/time.rs | 31 + src/bin/commands/whois.rs | 168 ++++ src/bin/monocle.rs | 1700 +---------------------------------- 15 files changed, 1839 insertions(+), 1688 deletions(-) create mode 100644 src/bin/commands/broker.rs create mode 100644 src/bin/commands/country.rs create mode 100644 src/bin/commands/ip.rs create mode 100644 src/bin/commands/mod.rs create mode 100644 src/bin/commands/parse.rs create mode 100644 src/bin/commands/pfx2as.rs create mode 100644 src/bin/commands/radar.rs create mode 100644 src/bin/commands/rpki.rs create mode 100644 src/bin/commands/search.rs create mode 100644 src/bin/commands/time.rs create mode 100644 src/bin/commands/whois.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index dd5a36e..ef23d08 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,10 @@ All notable changes to this project will be documented in this file. * **Table formatting**: ASPA table output now wraps long provider lists at 60 characters for better readability +### Code Improvements + +* Refactored CLI command modules: moved CLI argument definitions from main file to individual command submodules for better code organization and maintainability + ### Dependencies * Added `bgpkit-commons` v0.10 with features: `asinfo`, `rpki`, `countries` diff --git a/Cargo.lock b/Cargo.lock index e702c65..9e30966 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -214,7 +214,7 @@ dependencies = [ "chrono", "ipnet", "ipnet-trie", - "oneio 0.20.0", + "oneio", "regex", "reqwest 0.12.24", "serde", @@ -226,9 +226,9 @@ dependencies = [ [[package]] name = "bgpkit-parser" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d499d85a2285cbd2ab3bb84a727d0b93f62ba35e5758f5cc432ffad56eb071c1" +checksum = "6b7b1569f2ecafd08ddfb94f6627f8b038a77dd2be684b789fcbf6abb3e07c61" dependencies = [ "bitflags 2.10.0", "bytes", @@ -237,7 +237,7 @@ dependencies = [ "itertools 0.14.0", "log", "num_enum", - "oneio 0.19.2", + "oneio", "regex", "serde", ] @@ -1709,10 +1709,9 @@ dependencies = [ "ipnet-trie", "itertools 0.14.0", "json_to_table", - "oneio 0.20.0", + "oneio", "radar-rs", "rayon", - "regex", "rusqlite", "serde", "serde_json", @@ -1807,21 +1806,6 @@ version = "1.70.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" -[[package]] -name = "oneio" -version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "215a69ce9148910859f500c5f4d261fe1fb06cfe093f31f51abf216be06bd552" -dependencies = [ - "bzip2", - "dotenvy", - "flate2", - "reqwest 0.12.24", - "rust-s3", - "suppaftp", - "thiserror 2.0.17", -] - [[package]] name = "oneio" version = "0.20.0" diff --git a/Cargo.toml b/Cargo.toml index 8ebcf3a..d45c17e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ path = "src/bin/monocle.rs" [dependencies] anyhow = "1.0" bgpkit-broker = "0.10.1" -bgpkit-parser = { version = "0.12.0", features = ["serde"] } +bgpkit-parser = { version = "0.13.0", features = ["serde"] } config = { version = "0.15", features = ["toml"] } chrono = "0.4" chrono-humanize = "0.2" @@ -36,7 +36,6 @@ json_to_table = "0.12.0" oneio = { version = "0.20.0", default-features = false, features = ["https", "gz", "bz", "json"] } radar-rs = "0.1.0" rayon = "1.8" -regex = "1.10" bgpkit-commons = { version = "0.10", features = ["asinfo", "rpki", "countries"] } rusqlite = { version = "0.37", features = ["bundled"] } serde = { version = "1.0", features = ["derive"] } diff --git a/src/bin/commands/broker.rs b/src/bin/commands/broker.rs new file mode 100644 index 0000000..f88de4e --- /dev/null +++ b/src/bin/commands/broker.rs @@ -0,0 +1,144 @@ +use clap::Args; +use monocle::string_to_time; +use serde_json; +use tabled::settings::Style; +use tabled::{Table, Tabled}; + +/// Arguments for the Broker command +#[derive(Args)] +pub struct BrokerArgs { + /// starting timestamp (RFC3339 or unix epoch) + #[clap(long, short = 't')] + pub start_ts: String, + + /// ending timestamp (RFC3339 or unix epoch) + #[clap(long, short = 'T')] + pub end_ts: String, + + /// BGP collector name: e.g. rrc00, route-views2 + #[clap(long, short = 'c')] + pub collector: Option, + + /// BGP collection project name, e.g. routeviews, or riperis + #[clap(long, short = 'P')] + pub project: Option, + + /// Data type, e.g., updates or rib + #[clap(long)] + pub data_type: Option, + + /// Page number to fetch (1-based). If set, only this page will be fetched. + #[clap(long)] + pub page: Option, + + /// Page size for broker queries (default 1000) + #[clap(long)] + pub page_size: Option, +} + +pub fn run(args: BrokerArgs, json: bool) { + let BrokerArgs { + start_ts, + end_ts, + collector, + project, + data_type, + page, + page_size, + } = args; + + // parse time strings similar to Search subcommand + let ts_start = match string_to_time(&start_ts) { + Ok(t) => t.timestamp(), + Err(_) => { + eprintln!("start-ts is not a valid time string: {}", start_ts); + std::process::exit(1); + } + }; + let ts_end = match string_to_time(&end_ts) { + Ok(t) => t.timestamp(), + Err(_) => { + eprintln!("end-ts is not a valid time string: {}", end_ts); + std::process::exit(1); + } + }; + + let mut broker = bgpkit_broker::BgpkitBroker::new() + .ts_start(ts_start) + .ts_end(ts_end); + + if let Some(c) = collector { + broker = broker.collector_id(c.as_str()); + } + if let Some(p) = project { + broker = broker.project(p.as_str()); + } + if let Some(dt) = data_type { + broker = broker.data_type(dt.as_str()); + } + + let page_size = page_size.unwrap_or(1000); + broker = broker.page_size(page_size); + + let res = if let Some(p) = page { + broker.page(p).query_single_page() + } else { + // Use query() and limit to at most 10 pages worth of items + match broker.query() { + Ok(mut v) => { + let max_items = (page_size * 10) as usize; + if v.len() > max_items { + v.truncate(max_items); + } + Ok(v) + } + Err(e) => Err(e), + } + }; + + match res { + Ok(items) => { + if items.is_empty() { + println!("No MRT files found"); + return; + } + + if json { + match serde_json::to_string_pretty(&items) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("error serializing: {}", e), + } + } else { + #[derive(Tabled)] + struct BrokerItemDisplay { + #[tabled(rename = "Collector")] + collector_id: String, + #[tabled(rename = "Type")] + data_type: String, + #[tabled(rename = "Start Time (UTC)")] + ts_start: String, + #[tabled(rename = "URL")] + url: String, + #[tabled(rename = "Size (Bytes)")] + rough_size: i64, + } + + let display_items: Vec = items + .into_iter() + .map(|item| BrokerItemDisplay { + collector_id: item.collector_id, + data_type: item.data_type, + ts_start: item.ts_start.format("%Y-%m-%dT%H:%M:%SZ").to_string(), + url: item.url, + rough_size: item.rough_size, + }) + .collect(); + + println!("{}", Table::new(display_items).with(Style::markdown())); + } + } + Err(e) => { + eprintln!("failed to query: {}", e); + } + } +} diff --git a/src/bin/commands/country.rs b/src/bin/commands/country.rs new file mode 100644 index 0000000..56c58d0 --- /dev/null +++ b/src/bin/commands/country.rs @@ -0,0 +1,22 @@ +use clap::Args; +use monocle::{CountryEntry, CountryLookup}; +use tabled::settings::Style; +use tabled::Table; + +/// Arguments for the Country command +#[derive(Args)] +pub struct CountryArgs { + /// Search query, e.g. "US" or "United States" + pub queries: Vec, +} + +pub fn run(args: CountryArgs) { + let CountryArgs { queries } = args; + + let lookup = CountryLookup::new(); + let res: Vec = queries + .into_iter() + .flat_map(|query| lookup.lookup(query.as_str())) + .collect(); + println!("{}", Table::new(res).with(Style::rounded())); +} diff --git a/src/bin/commands/ip.rs b/src/bin/commands/ip.rs new file mode 100644 index 0000000..2724d8a --- /dev/null +++ b/src/bin/commands/ip.rs @@ -0,0 +1,44 @@ +use clap::Args; +use json_to_table::json_to_table; +use monocle::fetch_ip_info; +use serde_json::json; +use std::net::IpAddr; + +/// Arguments for the Ip command +#[derive(Args)] +pub struct IpArgs { + /// IP address to look up (optional) + #[clap()] + pub ip: Option, + + /// Print IP address only (e.g., for getting the public IP address quickly) + #[clap(long)] + pub simple: bool, +} + +pub fn run(args: IpArgs, json: bool) { + let IpArgs { ip, simple } = args; + + match fetch_ip_info(ip, simple) { + Ok(ipinfo) => { + if simple { + println!("{}", ipinfo.ip); + return; + } + + let json_value = json!(&ipinfo); + if json { + if let Err(e) = serde_json::to_writer_pretty(std::io::stdout(), &json_value) { + eprintln!("Error writing JSON to stdout: {}", e); + } + } else { + let mut table = json_to_table(&json_value); + table.collapse(); + println!("{}", table); + } + } + Err(e) => { + eprintln!("ERROR: unable to get ip information: {e}"); + } + } +} diff --git a/src/bin/commands/mod.rs b/src/bin/commands/mod.rs new file mode 100644 index 0000000..ec7cf04 --- /dev/null +++ b/src/bin/commands/mod.rs @@ -0,0 +1,31 @@ +pub mod broker; +pub mod country; +pub mod ip; +pub mod parse; +pub mod pfx2as; +pub mod radar; +pub mod rpki; +pub mod search; +pub mod time; +pub mod whois; + +pub(crate) fn elem_to_string( + elem: &bgpkit_parser::BgpElem, + json: bool, + pretty: bool, + collector: &str, +) -> Result { + if json { + let mut val = serde_json::json!(elem); + val.as_object_mut() + .ok_or_else(|| anyhow::anyhow!("Expected JSON object"))? + .insert("collector".to_string(), collector.into()); + if pretty { + Ok(serde_json::to_string_pretty(&val)?) + } else { + Ok(val.to_string()) + } + } else { + Ok(format!("{}|{}", elem, collector)) + } +} diff --git a/src/bin/commands/parse.rs b/src/bin/commands/parse.rs new file mode 100644 index 0000000..0d10ef3 --- /dev/null +++ b/src/bin/commands/parse.rs @@ -0,0 +1,109 @@ +use std::io::Write; +use std::path::PathBuf; + +use bgpkit_parser::encoder::MrtUpdatesEncoder; +use clap::Args; + +use monocle::{MrtParserFilters, ParseFilters}; + +use crate::elem_to_string; + +/// Arguments for the Parse command +#[derive(Args)] +pub struct ParseArgs { + /// File path to an MRT file, local or remote. + #[clap(name = "FILE")] + pub file_path: PathBuf, + + /// Pretty-print JSON output + #[clap(long)] + pub pretty: bool, + + /// MRT output file path + #[clap(long, short = 'M')] + pub mrt_path: Option, + + /// Filter by AS path regex string + #[clap(flatten)] + pub filters: ParseFilters, +} + +pub fn run(args: ParseArgs, json: bool) { + let ParseArgs { + file_path, + pretty, + mrt_path, + filters, + } = args; + + if let Err(e) = filters.validate() { + eprintln!("ERROR: {e}"); + return; + } + + let file_path = match file_path.to_str() { + Some(path) => path, + None => { + eprintln!("Invalid file path"); + std::process::exit(1); + } + }; + let parser = match filters.to_parser(file_path) { + Ok(p) => p, + Err(e) => { + eprintln!("Failed to create parser for {}: {}", file_path, e); + std::process::exit(1); + } + }; + + let mut stdout = std::io::stdout(); + + match mrt_path { + None => { + for elem in parser { + // output to stdout + let output_str = match elem_to_string(&elem, json, pretty, "") { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to format element: {}", e); + continue; + } + }; + if let Err(e) = writeln!(stdout, "{}", &output_str) { + if e.kind() != std::io::ErrorKind::BrokenPipe { + eprintln!("ERROR: {e}"); + } + std::process::exit(1); + } + } + } + Some(p) => { + let path = match p.to_str() { + Some(path) => path.to_string(), + None => { + eprintln!("Invalid MRT path"); + std::process::exit(1); + } + }; + println!("processing. filtered messages output to {}...", &path); + let mut encoder = MrtUpdatesEncoder::new(); + let mut writer = match oneio::get_writer(&path) { + Ok(w) => w, + Err(e) => { + eprintln!("ERROR: {e}"); + std::process::exit(1); + } + }; + let mut total_count = 0; + for elem in parser { + total_count += 1; + encoder.process_elem(&elem); + } + if let Err(e) = writer.write_all(&encoder.export_bytes()) { + eprintln!("Failed to write MRT data: {}", e); + } + drop(writer); + println!("done. total of {} message wrote", total_count); + } + } +} diff --git a/src/bin/commands/pfx2as.rs b/src/bin/commands/pfx2as.rs new file mode 100644 index 0000000..f569303 --- /dev/null +++ b/src/bin/commands/pfx2as.rs @@ -0,0 +1,95 @@ +use clap::Args; +use ipnet::IpNet; +use itertools::Itertools; +use monocle::Pfx2as; +use serde_json::{json, Value}; +use std::collections::{HashMap, HashSet}; + +/// Arguments for the Pfx2as command +#[derive(Args)] +pub struct Pfx2asArgs { + /// Prefix-to-AS mapping data file location + #[clap( + long, + default_value = "https://data.bgpkit.com/pfx2as/pfx2as-latest.json.bz2" + )] + pub data_file_path: String, + + /// IP prefixes or prefix files (one prefix per line) + #[clap(required = true)] + pub input: Vec, + + /// Only matching exact prefixes. By default, it does longest-prefix matching. + #[clap(short, long)] + pub exact_match: bool, +} + +pub fn run(args: Pfx2asArgs, json: bool) { + let Pfx2asArgs { + data_file_path, + input, + exact_match, + } = args; + + let pfx2as = match Pfx2as::new(Some(data_file_path)) { + Ok(v) => v, + Err(e) => { + eprintln!("ERROR: unable to open data file: {}", e); + std::process::exit(1); + } + }; + + // collect all prefixes to look up + let mut prefixes: Vec = vec![]; + for i in input { + match i.parse::() { + Ok(p) => prefixes.push(p), + Err(_) => { + // it might be a data file + if let Ok(lines) = oneio::read_lines(i.as_str()) { + for line in lines.map_while(Result::ok) { + if line.starts_with('#') { + continue; + } + let trimmed = line.trim().split(',').next().unwrap_or(line.as_str()); + if let Ok(p) = trimmed.parse::() { + prefixes.push(p); + } + } + } + } + } + } + + // map prefix to origins. one prefix may be mapped to multiple origins + prefixes.sort(); + let mut prefix_origins_map: HashMap> = HashMap::new(); + for p in prefixes { + let origins = match exact_match { + true => pfx2as.lookup_exact(p), + false => pfx2as.lookup_longest(p), + }; + prefix_origins_map.entry(p).or_default().extend(origins); + } + + // display + if json { + // map prefix_origin_pairs to a vector of JSON objects each with a + // "prefix" and "origin" field + let data = prefix_origins_map + .iter() + .map(|(p, o)| { + json!({"prefix": p.to_string(), "origins": o.iter().cloned().collect::>()}) + }) + .collect::>(); + if let Err(e) = serde_json::to_writer_pretty(std::io::stdout(), &data) { + eprintln!("Error writing JSON to stdout: {}", e); + } + } else { + for (prefix, origins) in prefix_origins_map { + let mut origins_vec = origins.iter().cloned().collect::>(); + origins_vec.sort(); + println!("{},{}", prefix, origins.iter().join(",")); + } + } +} diff --git a/src/bin/commands/radar.rs b/src/bin/commands/radar.rs new file mode 100644 index 0000000..e0d26d3 --- /dev/null +++ b/src/bin/commands/radar.rs @@ -0,0 +1,223 @@ +use clap::Subcommand; +use radar_rs::RadarClient; +use serde::Serialize; +use tabled::settings::Style; +use tabled::{Table, Tabled}; + +#[derive(Subcommand)] +pub enum RadarCommands { + /// get routing stats + Stats { + /// a two-letter country code or asn number (e.g., US or 13335) + #[clap(name = "QUERY")] + query: Option, + }, + + /// look up prefix-to-origin mapping on the most recent global routing table snapshot + Pfx2as { + /// an IP prefix or an AS number (e.g., 1.1.1.0/24 or 13335) + #[clap(name = "QUERY")] + query: String, + + /// filter by RPKI validation status, valid, invalid, or unknown + #[clap(short, long)] + rpki_status: Option, + }, +} + +pub fn run(commands: RadarCommands, json: bool) { + let client = match RadarClient::new() { + Ok(client) => client, + Err(e) => { + eprintln!("Failed to create Radar client: {}", e); + std::process::exit(1); + } + }; + + match commands { + RadarCommands::Stats { query } => run_stats(&client, query, json), + RadarCommands::Pfx2as { query, rpki_status } => { + run_pfx2as(&client, query, rpki_status, json) + } + } +} + +fn run_stats(client: &RadarClient, query: Option, json: bool) { + let (country, asn) = match query { + None => (None, None), + Some(q) => match q.parse::() { + Ok(asn) => (None, Some(asn)), + Err(_) => (Some(q), None), + }, + }; + + let res = match client.get_bgp_routing_stats(asn, country.clone()) { + Ok(res) => res, + Err(e) => { + eprintln!("ERROR: unable to get routing stats: {}", e); + return; + } + }; + + let scope = match (country, &asn) { + (None, None) => "global".to_string(), + (Some(c), None) => c, + (None, Some(asn)) => format!("as{}", asn), + (Some(_), Some(_)) => { + eprintln!("ERROR: cannot specify both country and ASN"); + return; + } + }; + + #[derive(Tabled, Serialize)] + struct Stats { + pub scope: String, + pub origins: u32, + pub prefixes: u32, + pub rpki_valid: String, + pub rpki_invalid: String, + pub rpki_unknown: String, + } + let table_data = vec![ + Stats { + scope: scope.clone(), + origins: res.stats.distinct_origins, + prefixes: res.stats.distinct_prefixes, + rpki_valid: format!( + "{} ({:.2}%)", + res.stats.routes_valid, + (res.stats.routes_valid as f64 / res.stats.routes_total as f64) * 100.0 + ), + rpki_invalid: format!( + "{} ({:.2}%)", + res.stats.routes_invalid, + (res.stats.routes_invalid as f64 / res.stats.routes_total as f64) * 100.0 + ), + rpki_unknown: format!( + "{} ({:.2}%)", + res.stats.routes_unknown, + (res.stats.routes_unknown as f64 / res.stats.routes_total as f64) * 100.0 + ), + }, + Stats { + scope: format!("{} ipv4", scope), + origins: res.stats.distinct_origins_ipv4, + prefixes: res.stats.distinct_prefixes_ipv4, + rpki_valid: format!( + "{} ({:.2}%)", + res.stats.routes_valid_ipv4, + (res.stats.routes_valid_ipv4 as f64 / res.stats.routes_total_ipv4 as f64) * 100.0 + ), + rpki_invalid: format!( + "{} ({:.2}%)", + res.stats.routes_invalid_ipv4, + (res.stats.routes_invalid_ipv4 as f64 / res.stats.routes_total_ipv4 as f64) * 100.0 + ), + rpki_unknown: format!( + "{} ({:.2}%)", + res.stats.routes_unknown_ipv4, + (res.stats.routes_unknown_ipv4 as f64 / res.stats.routes_total_ipv4 as f64) * 100.0 + ), + }, + Stats { + scope: format!("{} ipv6", scope), + origins: res.stats.distinct_origins_ipv6, + prefixes: res.stats.distinct_prefixes_ipv6, + rpki_valid: format!( + "{} ({:.2}%)", + res.stats.routes_valid_ipv6, + (res.stats.routes_valid_ipv6 as f64 / res.stats.routes_total_ipv6 as f64) * 100.0 + ), + rpki_invalid: format!( + "{} ({:.2}%)", + res.stats.routes_invalid_ipv6, + (res.stats.routes_invalid_ipv6 as f64 / res.stats.routes_total_ipv6 as f64) * 100.0 + ), + rpki_unknown: format!( + "{} ({:.2}%)", + res.stats.routes_unknown_ipv6, + (res.stats.routes_unknown_ipv6 as f64 / res.stats.routes_total_ipv6 as f64) * 100.0 + ), + }, + ]; + if json { + match serde_json::to_string_pretty(&table_data) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("Failed to serialize JSON: {}", e), + } + } else { + println!("{}", Table::new(table_data).with(Style::modern())); + println!("\nData generated at {} UTC.", res.meta.data_time); + } +} + +fn run_pfx2as(client: &RadarClient, query: String, rpki_status: Option, json: bool) { + let (asn, prefix) = match query.parse::() { + Ok(asn) => (Some(asn), None), + Err(_) => (None, Some(query)), + }; + + let rpki = if let Some(rpki_status) = rpki_status { + match rpki_status.to_lowercase().as_str() { + "valid" | "invalid" | "unknown" => Some(rpki_status), + _ => { + eprintln!("ERROR: invalid rpki status: {}", rpki_status); + return; + } + } + } else { + None + }; + + let res = match client.get_bgp_prefix_origins(asn, prefix, rpki) { + Ok(res) => res, + Err(e) => { + eprintln!("ERROR: unable to get prefix origins: {}", e); + return; + } + }; + + #[derive(Tabled, Serialize)] + struct Pfx2origin { + pub prefix: String, + pub origin: String, + pub rpki: String, + pub visibility: String, + } + + if res.prefix_origins.is_empty() { + println!("no prefix origins found for the given query"); + return; + } + + fn count_to_visibility(count: u32, total: u32) -> String { + let ratio = count as f64 / total as f64; + if ratio > 0.8 { + format!("high ({:.2}%)", ratio * 100.0) + } else if ratio < 0.2 { + format!("low ({:.2}%)", ratio * 100.0) + } else { + format!("mid ({:.2}%)", ratio * 100.0) + } + } + + let table_data = res + .prefix_origins + .into_iter() + .map(|entry| Pfx2origin { + prefix: entry.prefix, + origin: format!("as{}", entry.origin), + rpki: entry.rpki_validation.to_lowercase(), + visibility: count_to_visibility(entry.peer_count as u32, res.meta.total_peers as u32), + }) + .collect::>(); + if json { + match serde_json::to_string_pretty(&table_data) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("Error serializing data to JSON: {}", e), + } + } else { + println!("{}", Table::new(table_data).with(Style::modern())); + println!("\nData generated at {} UTC.", res.meta.data_time); + } +} diff --git a/src/bin/commands/rpki.rs b/src/bin/commands/rpki.rs new file mode 100644 index 0000000..933374f --- /dev/null +++ b/src/bin/commands/rpki.rs @@ -0,0 +1,326 @@ +use chrono::NaiveDate; +use clap::Subcommand; +use ipnet::IpNet; +use monocle::{ + get_aspas, get_roas, list_by_asn, list_by_prefix, load_rpki_data, summarize_asn, validate, + AspaTableEntry, RoaTableItem, SummaryTableItem, +}; +use serde_json::json; +use tabled::settings::object::Columns; +use tabled::settings::width::Width; +use tabled::settings::Style; +use tabled::Table; + +#[derive(Subcommand)] +pub enum RpkiCommands { + /// validate a prefix-asn pair with a RPKI validator (Cloudflare) + Check { + #[clap(short, long)] + asn: u32, + + #[clap(short, long)] + prefix: String, + }, + + /// list ROAs by ASN or prefix (Cloudflare real-time) + List { + /// prefix or ASN + #[clap()] + resource: String, + }, + + /// summarize RPKI status for a list of given ASNs (Cloudflare) + Summary { + #[clap()] + asns: Vec, + }, + + /// list ROAs from RPKI data (current or historical via bgpkit-commons) + Roas { + /// Filter by origin ASN + #[clap(long)] + origin: Option, + + /// Filter by prefix + #[clap(long)] + prefix: Option, + + /// Load historical data for this date (YYYY-MM-DD) + #[clap(long)] + date: Option, + + /// Historical data source: ripe, rpkiviews (default: ripe) + #[clap(long, default_value = "ripe")] + source: String, + + /// RPKIviews collector: soborost, massars, attn, kerfuffle (default: soborost) + #[clap(long, default_value = "soborost")] + collector: String, + }, + + /// list ASPAs from RPKI data (current or historical via bgpkit-commons) + Aspas { + /// Filter by customer ASN + #[clap(long)] + customer: Option, + + /// Filter by provider ASN + #[clap(long)] + provider: Option, + + /// Load historical data for this date (YYYY-MM-DD) + #[clap(long)] + date: Option, + + /// Historical data source: ripe, rpkiviews (default: ripe) + #[clap(long, default_value = "ripe")] + source: String, + + /// RPKIviews collector: soborost, massars, attn, kerfuffle (default: soborost) + #[clap(long, default_value = "soborost")] + collector: String, + }, +} + +pub fn run(commands: RpkiCommands, json: bool) { + match commands { + RpkiCommands::Check { asn, prefix } => run_check(asn, prefix, json), + RpkiCommands::List { resource } => run_list(resource, json), + RpkiCommands::Summary { asns } => run_summary(asns, json), + RpkiCommands::Roas { + origin, + prefix, + date, + source, + collector, + } => run_roas(origin, prefix, date, source, collector, json), + RpkiCommands::Aspas { + customer, + provider, + date, + source, + collector, + } => run_aspas(customer, provider, date, source, collector, json), + } +} + +fn run_check(asn: u32, prefix: String, json: bool) { + let (validity, roas) = match validate(asn, prefix.as_str()) { + Ok((v1, v2)) => (v1, v2), + Err(e) => { + eprintln!("ERROR: unable to check RPKI validity: {}", e); + return; + } + }; + if json { + let roa_items: Vec = roas.into_iter().map(RoaTableItem::from).collect(); + let output = json!({ + "validation": validity, + "covering_roas": roa_items + }); + println!("{}", output); + } else { + println!("RPKI validation result:"); + println!("{}", Table::new(vec![validity]).with(Style::markdown())); + println!(); + println!("Covering prefixes:"); + println!( + "{}", + Table::new( + roas.into_iter() + .map(RoaTableItem::from) + .collect::>() + ) + .with(Style::markdown()) + ); + } +} + +fn run_list(resource: String, json: bool) { + let resources = match resource.parse::() { + Ok(asn) => match list_by_asn(asn) { + Ok(resources) => resources, + Err(e) => { + eprintln!("Failed to list ROAs for ASN {}: {}", asn, e); + return; + } + }, + Err(_) => match resource.parse::() { + Ok(prefix) => match list_by_prefix(&prefix) { + Ok(resources) => resources, + Err(e) => { + eprintln!("Failed to list ROAs for prefix {}: {}", prefix, e); + return; + } + }, + Err(_) => { + eprintln!( + "ERROR: list resource not an AS number or a prefix: {}", + resource + ); + return; + } + }, + }; + + let roas: Vec = resources + .into_iter() + .flat_map(Into::>::into) + .collect(); + if json { + match serde_json::to_string(&roas) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), + } + } else if roas.is_empty() { + println!("no matching ROAS found for {}", resource); + } else { + println!("{}", Table::new(roas).with(Style::markdown())); + } +} + +fn run_summary(asns: Vec, json: bool) { + let res: Vec = asns + .into_iter() + .filter_map(|v| match summarize_asn(v) { + Ok(summary) => Some(summary), + Err(e) => { + eprintln!("Failed to summarize ASN {}: {}", v, e); + None + } + }) + .collect(); + + if json { + match serde_json::to_string(&res) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), + } + } else { + println!("{}", Table::new(res).with(Style::markdown())); + } +} + +fn run_roas( + origin: Option, + prefix: Option, + date: Option, + source: String, + collector: String, + json: bool, +) { + // Parse date if provided + let parsed_date = match &date { + Some(d) => match NaiveDate::parse_from_str(d, "%Y-%m-%d") { + Ok(date) => Some(date), + Err(e) => { + eprintln!("ERROR: Invalid date format '{}': {}. Use YYYY-MM-DD", d, e); + return; + } + }, + None => None, + }; + + // Load RPKI data + let commons = match load_rpki_data(parsed_date, Some(source.as_str()), Some(collector.as_str())) + { + Ok(c) => c, + Err(e) => { + eprintln!("ERROR: Failed to load RPKI data: {}", e); + return; + } + }; + + // Get ROAs with filters + let roas = match get_roas(&commons, prefix.as_deref(), origin) { + Ok(r) => r, + Err(e) => { + eprintln!("ERROR: Failed to get ROAs: {}", e); + return; + } + }; + + if json { + match serde_json::to_string(&roas) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), + } + } else if roas.is_empty() { + println!("No ROAs found matching the criteria"); + } else { + println!( + "Found {} ROAs{}", + roas.len(), + match &date { + Some(d) => format!(" (historical data from {})", d), + None => " (current data)".to_string(), + } + ); + println!("{}", Table::new(roas).with(Style::markdown())); + } +} + +fn run_aspas( + customer: Option, + provider: Option, + date: Option, + source: String, + collector: String, + json: bool, +) { + // Parse date if provided + let parsed_date = match &date { + Some(d) => match NaiveDate::parse_from_str(d, "%Y-%m-%d") { + Ok(date) => Some(date), + Err(e) => { + eprintln!("ERROR: Invalid date format '{}': {}. Use YYYY-MM-DD", d, e); + return; + } + }, + None => None, + }; + + // Load RPKI data + let commons = match load_rpki_data(parsed_date, Some(source.as_str()), Some(collector.as_str())) + { + Ok(c) => c, + Err(e) => { + eprintln!("ERROR: Failed to load RPKI data: {}", e); + return; + } + }; + + // Get ASPAs with filters + let aspas = match get_aspas(&commons, customer, provider) { + Ok(a) => a, + Err(e) => { + eprintln!("ERROR: Failed to get ASPAs: {}", e); + return; + } + }; + + if json { + match serde_json::to_string(&aspas) { + Ok(json_str) => println!("{}", json_str), + Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), + } + } else if aspas.is_empty() { + println!("No ASPAs found matching the criteria"); + } else { + println!( + "Found {} ASPAs{}", + aspas.len(), + match &date { + Some(d) => format!(" (historical data from {})", d), + None => " (current data)".to_string(), + } + ); + let table_entries: Vec = aspas.iter().map(AspaTableEntry::from).collect(); + println!( + "{}", + Table::new(table_entries) + .with(Style::markdown()) + .modify(Columns::last(), Width::wrap(60).keep_words(true)) + ); + } +} diff --git a/src/bin/commands/search.rs b/src/bin/commands/search.rs new file mode 100644 index 0000000..3028ddb --- /dev/null +++ b/src/bin/commands/search.rs @@ -0,0 +1,601 @@ +use std::collections::HashMap; +use std::io::Write; +use std::path::PathBuf; +use std::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + +use bgpkit_parser::encoder::MrtUpdatesEncoder; +use bgpkit_parser::BgpElem; +use clap::Args; +use monocle::{MrtParserFilters, MsgStore, SearchFilters}; +use rayon::prelude::*; +use tracing::{info, warn}; + +use crate::commands::elem_to_string; + +/// Arguments for the Search command +#[derive(Args)] +pub struct SearchArgs { + /// Dry-run, do not download or parse. + #[clap(long)] + pub dry_run: bool, + + /// Pretty-print JSON output + #[clap(long)] + pub pretty: bool, + + /// SQLite output file path + #[clap(long)] + pub sqlite_path: Option, + + /// MRT output file path + #[clap(long, short = 'M')] + pub mrt_path: Option, + + /// SQLite reset database content if exists + #[clap(long)] + pub sqlite_reset: bool, + + /// Filter by AS path regex string + #[clap(flatten)] + pub filters: SearchFilters, +} + +/// Maximum number of retry attempts (3 attempts total including the first attempt) +const MAX_RETRIES: u32 = 3; + +/// Initial retry delay in seconds +const INITIAL_DELAY: u64 = 1; + +/// Maximum retry delay in seconds +const MAX_DELAY: u64 = 30; + +/// Message types sent through the writer channel +#[derive(Debug)] +enum WriterMessage { + /// BGP element with its collector ID + Element(Box, String), + /// Signal that a file has been completely processed + FileComplete, +} + +/// Progress update messages for real-time display +#[derive(Debug, Clone)] +enum ProgressUpdate { + /// A file was completed with message count and success status + FileComplete { message_count: u32, success: bool }, + /// A new page started processing + PageStarted { page_num: i64, timestamp: String }, +} + +/// Structure to track failed processing attempts for retry mechanism +#[derive(Debug, Clone)] +struct FailedItem { + item: bgpkit_broker::BrokerItem, + attempt_count: u32, + last_error: String, +} + +impl FailedItem { + fn new(item: bgpkit_broker::BrokerItem, error: String) -> Self { + Self { + item, + attempt_count: 1, + last_error: error, + } + } + + fn next_delay(&self) -> Duration { + let base_delay = INITIAL_DELAY * 2_u64.pow(self.attempt_count - 1); + let delay = std::cmp::min(base_delay, MAX_DELAY); + Duration::from_secs(delay) + } + + fn should_retry(&self) -> bool { + self.attempt_count < MAX_RETRIES + } + + fn increment_attempt(&mut self, error: String) { + self.attempt_count += 1; + self.last_error = error; + } +} + +pub fn run(args: SearchArgs, json: bool) { + let SearchArgs { + dry_run, + pretty, + sqlite_path, + mrt_path, + sqlite_reset, + filters, + } = args; + + if let Err(e) = filters.validate() { + eprintln!("ERROR: {e}"); + return; + } + + let mut sqlite_path_str = "".to_string(); + let sqlite_db = sqlite_path.and_then(|p| { + p.to_str().map(|s| { + sqlite_path_str = s.to_string(); + match MsgStore::new(&Some(sqlite_path_str.clone()), sqlite_reset) { + Ok(store) => store, + Err(e) => { + eprintln!("Failed to create SQLite store: {}", e); + std::process::exit(1); + } + } + }) + }); + let mrt_path = mrt_path.and_then(|p| p.to_str().map(|s| s.to_string())); + let show_progress = sqlite_db.is_some() || mrt_path.is_some(); + + // Create base broker for pagination + let base_broker = match filters.build_broker() { + Ok(broker) => broker, + Err(e) => { + eprintln!("Failed to create broker: {}", e); + std::process::exit(1); + } + }; + + if dry_run { + // For dry run, get first page to show what would be processed + let items = match base_broker.clone().page(1).query_single_page() { + Ok(items) => items, + Err(e) => { + eprintln!("Failed to query broker for dry run: {}", e); + std::process::exit(1); + } + }; + + let total_size: i64 = items.iter().map(|x| x.rough_size).sum(); + println!( + "First page: {} files, {} bytes (will process all pages with ~1000 files each)", + items.len(), + total_size + ); + return; + } + + let (sender, receiver): (Sender, Receiver) = channel(); + // Single progress channel for all updates + let (progress_sender, progress_receiver): (Sender, Receiver) = + channel(); + + // dedicated thread for handling output of results + let writer_thread = thread::spawn(move || { + let display_stdout = sqlite_db.is_none() && mrt_path.is_none(); + let mut mrt_writer = match mrt_path { + Some(p) => match oneio::get_writer(p.as_str()) { + Ok(writer) => Some((MrtUpdatesEncoder::new(), writer)), + Err(e) => { + eprintln!("Failed to create MRT writer: {}", e); + None + } + }, + None => None, + }; + + let mut current_file_cache = vec![]; + let mut total_msg_count = 0; + + for msg in receiver { + match msg { + WriterMessage::Element(elem, collector) => { + total_msg_count += 1; + + if display_stdout { + let output_str = + match elem_to_string(&elem, json, pretty, collector.as_str()) { + Ok(s) => s, + Err(e) => { + eprintln!("Failed to format element: {}", e); + continue; + } + }; + println!("{output_str}"); + continue; + } + + current_file_cache.push((*elem, collector)); + } + WriterMessage::FileComplete => { + // Commit current file's data to SQLite + if !current_file_cache.is_empty() { + if let Some(db) = &sqlite_db { + if let Err(e) = db.insert_elems(¤t_file_cache) { + eprintln!("Failed to insert elements to database: {}", e); + } + } + if let Some((encoder, _writer)) = &mut mrt_writer { + for (elem, _) in ¤t_file_cache { + encoder.process_elem(elem); + } + } + current_file_cache.clear(); + } + } + } + } + + // Handle any remaining data in cache (in case last file didn't send FileComplete) + if !current_file_cache.is_empty() { + if let Some(db) = &sqlite_db { + if let Err(e) = db.insert_elems(¤t_file_cache) { + eprintln!("Failed to insert elements to database: {}", e); + } + } + if let Some((encoder, _writer)) = &mut mrt_writer { + for (elem, _) in ¤t_file_cache { + encoder.process_elem(elem); + } + } + } + + if let Some((encoder, writer)) = &mut mrt_writer { + let bytes = encoder.export_bytes(); + if let Err(e) = writer.write_all(&bytes) { + eprintln!("Failed to write MRT data: {}", e); + } + } + drop(mrt_writer); + + if !display_stdout { + println!("found {total_msg_count} messages, written into file {sqlite_path_str}"); + } + }); + + // Setup spinner for paginated processing + let pb = if show_progress { + let pb = indicatif::ProgressBar::new_spinner(); + pb.set_message("Processed 0 files, found 0 messages"); + pb.enable_steady_tick(Duration::from_millis(100)); + Some(pb) + } else { + None + }; + + // Simplified progress thread with single channel + let pb_for_updates = pb.clone(); + let progress_thread = thread::spawn(move || { + let mut files_processed: u64 = 0; + let mut total_messages: u64 = 0; + let mut succeeded_files: u64 = 0; + let mut failed_files: u64 = 0; + let mut current_page: i64 = 1; + let mut current_timestamp = String::new(); + + for update in progress_receiver { + match update { + ProgressUpdate::FileComplete { + message_count, + success, + } => { + files_processed += 1; + total_messages += message_count as u64; + if success { + succeeded_files += 1; + } else { + failed_files += 1; + } + } + ProgressUpdate::PageStarted { + page_num, + timestamp, + } => { + current_page = page_num; + current_timestamp = timestamp; + } + } + + // Update progress display + if let Some(ref pb) = pb_for_updates { + let page_info = if current_timestamp.is_empty() { + format!( + " | Page {} (succeeded: {}, failed: {})", + current_page, succeeded_files, failed_files + ) + } else { + format!( + " | Page {} (succeeded: {}, failed: {}) {}", + current_page, succeeded_files, failed_files, current_timestamp + ) + }; + + pb.set_message(format!( + "Processed {} files, found {} messages{}", + files_processed, total_messages, page_info + )); + } + } + }); + + // Create shared structure to collect failed items + let failed_items = Arc::new(Mutex::new(Vec::::new())); + let failed_items_clone = Arc::clone(&failed_items); + + // Paginated processing loop + let mut page = 1i64; + + loop { + let items = match base_broker.clone().page(page).query_single_page() { + Ok(items) => items, + Err(e) => { + eprintln!("Failed to fetch page {}: {}", page, e); + break; + } + }; + + if items.is_empty() { + info!("Reached empty page {}, finishing", page); + break; + } + + let page_size = items.len(); + + // Send page started update to progress thread + let time_info = if let Some(first_item) = items.first() { + format!("@ {}", first_item.ts_start.format("%Y-%m-%d %H:%M UTC")) + } else { + String::new() + }; + + if progress_sender + .send(ProgressUpdate::PageStarted { + page_num: page, + timestamp: time_info.clone(), + }) + .is_err() + { + // Progress thread may have ended, continue + } + + if !show_progress { + info!("Starting page {} ({} files){}", page, page_size, time_info); + info!("Processing page {} with {} items", page, page_size); + } + + // Process this page's items using existing parallel logic + let progress_sender_clone = progress_sender.clone(); + + items.into_par_iter().for_each_with( + ( + sender.clone(), + progress_sender_clone, + failed_items_clone.clone(), + ), + |(s, progress_sender, failed_items), item| { + let url = item.url.clone(); + let collector = item.collector_id.clone(); + + if !show_progress { + info!("start parsing {}", url.as_str()); + } + + let parser = match filters.to_parser(url.as_str()) { + Ok(p) => p, + Err(e) => { + let error_msg = format!("Failed to parse {}: {}", url.as_str(), e); + if !show_progress { + eprintln!("{}", error_msg); + } + // Store failed item for retry + if let Ok(mut failed) = failed_items.lock() { + failed.push(FailedItem::new(item, error_msg)); + } + // Send failure progress update + if progress_sender + .send(ProgressUpdate::FileComplete { + message_count: 0, + success: false, + }) + .is_err() + { + // Progress thread may have ended, ignore + } + return; + } + }; + + let mut elems_count = 0; + for elem in parser { + if s.send(WriterMessage::Element(Box::new(elem), collector.clone())) + .is_err() + { + // Channel closed, break out + break; + } + elems_count += 1; + } + + // Send file completion signal to trigger per-file commit + if s.send(WriterMessage::FileComplete).is_err() { + // Channel closed, ignore + } + + // Send success progress update + if progress_sender + .send(ProgressUpdate::FileComplete { + message_count: elems_count, + success: true, + }) + .is_err() + { + // Progress thread may have ended, ignore + } + + if !show_progress { + info!("finished parsing {}", url.as_str()); + } + }, + ); + + // Page processing complete - no need to update counters as they're updated in real-time + + page += 1; + + // Early exit if partial page (last page) + if page_size < 1000 { + info!("Processed final page {} with {} items", page - 1, page_size); + break; + } + } + + if let Some(pb) = pb { + let final_message = format!("Completed {} pages", page - 1); + pb.finish_with_message(final_message); + } + + if !show_progress { + info!("Completed processing across {} pages", page - 1); + } + + // Retry phase for failed items + let failed_count = { + match failed_items.lock() { + Ok(failed) => failed.len(), + Err(e) => { + warn!("Failed to lock failed_items mutex: {}", e); + 0 + } + } + }; + + if failed_count > 0 { + if !show_progress { + info!("Starting retry phase for {} failed items", failed_count); + } + + // Process retries sequentially to avoid overwhelming servers + let mut retry_queue = { + match failed_items.lock() { + Ok(failed) => failed.clone(), + Err(e) => { + warn!("Failed to lock failed_items mutex for retry: {}", e); + vec![] + } + } + }; + + let mut retry_stats = HashMap::new(); + let mut total_retries = 0; + let mut successful_retries = 0; + + while !retry_queue.is_empty() { + let mut new_failures = Vec::new(); + + for mut failed_item in retry_queue { + if !failed_item.should_retry() { + // Max retries reached + *retry_stats.entry("max_retries_reached").or_insert(0) += 1; + continue; + } + + let delay = failed_item.next_delay(); + if !show_progress { + info!( + "Retrying {} (attempt {}/{}) after {}s delay", + failed_item.item.url.as_str(), + failed_item.attempt_count + 1, + MAX_RETRIES, + delay.as_secs() + ); + } + + thread::sleep(delay); + total_retries += 1; + + let parser = match filters.to_parser(failed_item.item.url.as_str()) { + Ok(p) => p, + Err(e) => { + let error_msg = format!( + "Retry failed to parse {}: {}", + failed_item.item.url.as_str(), + e + ); + if !show_progress { + warn!("{}", error_msg); + } + failed_item.increment_attempt(error_msg); + new_failures.push(failed_item); + continue; + } + }; + + let mut elems_count = 0; + let mut parse_successful = true; + + for elem in parser { + if sender + .send(WriterMessage::Element( + Box::new(elem), + failed_item.item.collector_id.clone(), + )) + .is_err() + { + // Channel closed, mark as failed + parse_successful = false; + break; + } + elems_count += 1; + } + + // Send file completion signal for retry as well + if parse_successful && sender.send(WriterMessage::FileComplete).is_err() { + parse_successful = false; + } + + if parse_successful { + successful_retries += 1; + // Retry successful - progress already tracked by main processing + if !show_progress { + info!( + "Successfully retried {} (found {} messages)", + failed_item.item.url.as_str(), + elems_count + ); + } + } else { + let error_msg = "Retry failed: channel closed during processing".to_string(); + failed_item.increment_attempt(error_msg); + new_failures.push(failed_item); + } + } + + retry_queue = new_failures; + } + + // Log retry statistics + let final_failures = retry_queue.len(); + if !show_progress { + info!( + "Retry phase completed: {} total retry attempts, {} successful, {} final failures", + total_retries, successful_retries, final_failures + ); + + if final_failures > 0 { + warn!( + "Warning: {} files could not be processed after {} retry attempts", + final_failures, MAX_RETRIES + ); + } + } + } + + // Close channels to signal completion + drop(sender); + drop(progress_sender); + + // wait for the output thread to stop + if let Err(e) = writer_thread.join() { + eprintln!("Writer thread failed: {:?}", e); + } + if let Err(e) = progress_thread.join() { + eprintln!("Progress thread failed: {:?}", e); + } +} diff --git a/src/bin/commands/time.rs b/src/bin/commands/time.rs new file mode 100644 index 0000000..31d9a02 --- /dev/null +++ b/src/bin/commands/time.rs @@ -0,0 +1,31 @@ +use clap::Args; +use monocle::{parse_time_string_to_rfc3339, time_to_table}; + +/// Arguments for the Time command +#[derive(Args)] +pub struct TimeArgs { + /// Time stamp or time string to convert + #[clap()] + pub time: Vec, + + /// Simple output, only print the converted time + #[clap(short, long)] + pub simple: bool, +} + +pub fn run(args: TimeArgs) { + let TimeArgs { time, simple } = args; + + let timestring_res = match simple { + true => parse_time_string_to_rfc3339(&time), + false => time_to_table(&time), + }; + match timestring_res { + Ok(t) => { + println!("{t}") + } + Err(e) => { + eprintln!("ERROR: {e}") + } + }; +} diff --git a/src/bin/commands/whois.rs b/src/bin/commands/whois.rs new file mode 100644 index 0000000..3ac23f5 --- /dev/null +++ b/src/bin/commands/whois.rs @@ -0,0 +1,168 @@ +use clap::Args; +use monocle::{As2org, MonocleConfig, SearchResult, SearchResultConcise, SearchType}; +use tabled::settings::Style; +use tabled::Table; + +/// Arguments for the Whois command +#[derive(Args)] +pub struct WhoisArgs { + /// Search query, an ASN (e.g. "400644") or a name (e.g. "bgpkit") + pub query: Vec, + + /// Search AS and Org name only + #[clap(short, long)] + pub name_only: bool, + + /// Search by ASN only + #[clap(short, long)] + pub asn_only: bool, + + /// Search by country only + #[clap(short = 'C', long)] + pub country_only: bool, + + /// Refresh the local as2org database + #[clap(short, long)] + pub update: bool, + + /// Output to pretty table, default markdown table + #[clap(short, long)] + pub pretty: bool, + + /// Display a full table (with ord_id, org_size) + #[clap(short = 'F', long)] + pub full_table: bool, + + /// Export to pipe-separated values + #[clap(short = 'P', long)] + pub psv: bool, + + /// Show full country names instead of 2-letter code + #[clap(short, long)] + pub full_country: bool, +} + +pub fn run(config: &MonocleConfig, args: WhoisArgs) { + let WhoisArgs { + query, + name_only, + asn_only, + country_only, + update, + pretty, + full_table, + full_country, + psv, + } = args; + + let data_dir = config.data_dir.as_str(); + let as2org = match As2org::new(&Some(format!("{data_dir}/monocle-data.sqlite3"))) { + Ok(as2org) => as2org, + Err(e) => { + eprintln!("Failed to create AS2org database: {}", e); + std::process::exit(1); + } + }; + + if update { + // if the update flag is set, clear existing as2org data and re-download later + if let Err(e) = as2org.clear_db() { + eprintln!("Failed to clear database: {}", e); + std::process::exit(1); + } + } + + if as2org.is_db_empty() { + println!("bootstrapping as2org data now... (it will take about one minute)"); + if let Err(e) = as2org.parse_insert_as2org(None) { + eprintln!("Failed to bootstrap AS2org data: {}", e); + std::process::exit(1); + } + println!("bootstrapping as2org data finished"); + } + + let mut search_type: SearchType = match (name_only, asn_only) { + (true, false) => SearchType::NameOnly, + (false, true) => SearchType::AsnOnly, + (false, false) => SearchType::Guess, + (true, true) => { + eprintln!("ERROR: name-only and asn-only cannot be both true"); + return; + } + }; + + if country_only { + search_type = SearchType::CountryOnly; + } + + let mut res = query + .into_iter() + .flat_map( + |q| match as2org.search(q.as_str(), &search_type, full_country) { + Ok(results) => results, + Err(e) => { + eprintln!("Search error for '{}': {}", q, e); + Vec::new() + } + }, + ) + .collect::>(); + + // order search results by AS number + res.sort_by_key(|v| v.asn); + + match full_table { + false => { + let res_concise = res.into_iter().map(|x: SearchResult| SearchResultConcise { + asn: x.asn, + as_name: x.as_name, + org_name: x.org_name, + org_country: x.org_country, + }); + if psv { + println!("asn|asn_name|org_name|org_country"); + for res in res_concise { + println!( + "{}|{}|{}|{}", + res.asn, res.as_name, res.org_name, res.org_country + ); + } + return; + } + + match pretty { + true => { + println!("{}", Table::new(res_concise).with(Style::rounded())); + } + false => { + println!("{}", Table::new(res_concise).with(Style::markdown())); + } + }; + } + true => { + if psv { + println!("asn|asn_name|org_name|org_id|org_country|org_size"); + for entry in res { + println!( + "{}|{}|{}|{}|{}|{}", + entry.asn, + entry.as_name, + entry.org_name, + entry.org_id, + entry.org_country, + entry.org_size + ); + } + return; + } + match pretty { + true => { + println!("{}", Table::new(res).with(Style::rounded())); + } + false => { + println!("{}", Table::new(res).with(Style::markdown())); + } + }; + } + } +} diff --git a/src/bin/monocle.rs b/src/bin/monocle.rs index b92bb6c..6b82792 100644 --- a/src/bin/monocle.rs +++ b/src/bin/monocle.rs @@ -2,92 +2,25 @@ #![deny(clippy::unwrap_used)] #![deny(clippy::expect_used)] -use std::collections::{HashMap, HashSet}; -use std::io::Write; -use std::net::IpAddr; -use std::path::PathBuf; -use std::sync::mpsc::{channel, Receiver, Sender}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; - -use bgpkit_parser::encoder::MrtUpdatesEncoder; use bgpkit_parser::BgpElem; -use chrono::NaiveDate; use clap::{Parser, Subcommand}; -use ipnet::IpNet; -use itertools::Itertools; -use json_to_table::json_to_table; use monocle::*; -use radar_rs::RadarClient; -use rayon::prelude::*; -use serde::Serialize; -use serde_json::{json, Value}; -use tabled::settings::object::Columns; -use tabled::settings::width::Width; -use tabled::settings::Style; -use tabled::{Table, Tabled}; -use tracing::{info, warn, Level}; - -/// Maximum number of retry attempts (3 attempts total including the first attempt) -const MAX_RETRIES: u32 = 3; - -/// Initial retry delay in seconds -const INITIAL_DELAY: u64 = 1; - -/// Maximum retry delay in seconds -const MAX_DELAY: u64 = 30; - -/// Message types sent through the writer channel -#[derive(Debug)] -enum WriterMessage { - /// BGP element with its collector ID - Element(Box, String), - /// Signal that a file has been completely processed - FileComplete, -} - -/// Progress update messages for real-time display -#[derive(Debug, Clone)] -enum ProgressUpdate { - /// A file was completed with message count and success status - FileComplete { message_count: u32, success: bool }, - /// A new page started processing - PageStarted { page_num: i64, timestamp: String }, -} - -/// Structure to track failed processing attempts for retry mechanism -#[derive(Debug, Clone)] -struct FailedItem { - item: bgpkit_broker::BrokerItem, - attempt_count: u32, - last_error: String, -} - -impl FailedItem { - fn new(item: bgpkit_broker::BrokerItem, error: String) -> Self { - Self { - item, - attempt_count: 1, - last_error: error, - } - } - - fn next_delay(&self) -> Duration { - let base_delay = INITIAL_DELAY * 2_u64.pow(self.attempt_count - 1); - let delay = std::cmp::min(base_delay, MAX_DELAY); - Duration::from_secs(delay) - } - - fn should_retry(&self) -> bool { - self.attempt_count < MAX_RETRIES - } - - fn increment_attempt(&mut self, error: String) { - self.attempt_count += 1; - self.last_error = error; - } -} +use serde_json::json; +use tracing::Level; + +mod commands; + +// Re-export argument types from command modules for use in the Commands enum +use commands::broker::BrokerArgs; +use commands::country::CountryArgs; +use commands::ip::IpArgs; +use commands::parse::ParseArgs; +use commands::pfx2as::Pfx2asArgs; +use commands::radar::RadarCommands; +use commands::rpki::RpkiCommands; +use commands::search::SearchArgs; +use commands::time::TimeArgs; +use commands::whois::WhoisArgs; #[derive(Parser)] #[clap(author, version, about, long_about = None)] @@ -112,134 +45,22 @@ struct Cli { #[derive(Subcommand)] enum Commands { /// Parse individual MRT files given a file path, local or remote. - Parse { - /// File path to an MRT file, local or remote. - #[clap(name = "FILE")] - file_path: PathBuf, - - /// Pretty-print JSON output - #[clap(long)] - pretty: bool, + Parse(ParseArgs), - /// MRT output file path - #[clap(long, short = 'M')] - mrt_path: Option, - - /// Filter by AS path regex string - #[clap(flatten)] - filters: ParseFilters, - }, /// Query BGPKIT Broker for the meta data of available MRT files. - Broker { - /// starting timestamp (RFC3339 or unix epoch) - #[clap(long, short = 't')] - start_ts: String, - - /// ending timestamp (RFC3339 or unix epoch) - #[clap(long, short = 'T')] - end_ts: String, - - /// BGP collector name: e.g. rrc00, route-views2 - #[clap(long, short = 'c')] - collector: Option, - - /// BGP collection project name, e.g. routeviews, or riperis - #[clap(long, short = 'P')] - project: Option, - - /// Data type, e.g., updates or rib - #[clap(long)] - data_type: Option, - - /// Page number to fetch (1-based). If set, only this page will be fetched. - #[clap(long)] - page: Option, - - /// Page size for broker queries (default 1000) - #[clap(long)] - page_size: Option, - }, + Broker(BrokerArgs), /// Search BGP messages from all available public MRT files. - Search { - /// Dry-run, do not download or parse. - #[clap(long)] - dry_run: bool, + Search(SearchArgs), - /// Pretty-print JSON output - #[clap(long)] - pretty: bool, - - /// SQLite output file path - #[clap(long)] - sqlite_path: Option, - - /// MRT output file path - #[clap(long, short = 'M')] - mrt_path: Option, - - /// SQLite reset database content if exists - #[clap(long)] - sqlite_reset: bool, - - /// Filter by AS path regex string - #[clap(flatten)] - filters: SearchFilters, - }, /// ASN and organization lookup utility. - Whois { - /// Search query, an ASN (e.g. "400644") or a name (e.g. "bgpkit") - query: Vec, - - /// Search AS and Org name only - #[clap(short, long)] - name_only: bool, - - /// Search by ASN only - #[clap(short, long)] - asn_only: bool, - - /// Search by country only - #[clap(short = 'C', long)] - country_only: bool, - - /// Refresh the local as2org database - #[clap(short, long)] - update: bool, - - /// Output to pretty table, default markdown table - #[clap(short, long)] - pretty: bool, - - /// Display a full table (with ord_id, org_size) - #[clap(short = 'F', long)] - full_table: bool, - - /// Export to pipe-separated values - #[clap(short = 'P', long)] - psv: bool, - - /// Show full country names instead of 2-letter code - #[clap(short, long)] - full_country: bool, - }, + Whois(WhoisArgs), /// Country name and code lookup utilities - Country { - /// Search query, e.g. "US" or "United States" - queries: Vec, - }, + Country(CountryArgs), /// Time conversion utilities - Time { - /// Time stamp or time string to convert - #[clap()] - time: Vec, - - /// Simple output, only print the converted time - #[clap(short, long)] - simple: bool, - }, + Time(TimeArgs), /// RPKI utilities Rpki { @@ -248,15 +69,7 @@ enum Commands { }, /// IP information lookup - Ip { - /// IP address to look up (optional) - #[clap()] - ip: Option, - - /// Print IP address only (e.g., for getting the public IP address quickly) - #[clap(long)] - simple: bool, - }, + Ip(IpArgs), /// Cloudflare Radar API lookup (set CF_API_TOKEN to enable) Radar { @@ -265,117 +78,10 @@ enum Commands { }, /// Bulk prefix-to-AS mapping lookup with the pre-generated data file. - Pfx2as { - /// Prefix-to-AS mapping data file location - #[clap( - long, - default_value = "https://data.bgpkit.com/pfx2as/pfx2as-latest.json.bz2" - )] - data_file_path: String, - - /// IP prefixes or prefix files (one prefix per line) - #[clap(required = true)] - input: Vec, - - /// Only matching exact prefixes. By default, it does longest-prefix matching. - #[clap(short, long)] - exact_match: bool, - }, + Pfx2as(Pfx2asArgs), } -#[derive(Subcommand)] -enum RpkiCommands { - /// validate a prefix-asn pair with a RPKI validator (Cloudflare) - Check { - #[clap(short, long)] - asn: u32, - - #[clap(short, long)] - prefix: String, - }, - - /// list ROAs by ASN or prefix (Cloudflare real-time) - List { - /// prefix or ASN - #[clap()] - resource: String, - }, - - /// summarize RPKI status for a list of given ASNs (Cloudflare) - Summary { - #[clap()] - asns: Vec, - }, - - /// list ROAs from RPKI data (current or historical via bgpkit-commons) - Roas { - /// Filter by origin ASN - #[clap(long)] - origin: Option, - - /// Filter by prefix - #[clap(long)] - prefix: Option, - - /// Load historical data for this date (YYYY-MM-DD) - #[clap(long)] - date: Option, - - /// Historical data source: ripe, rpkiviews (default: ripe) - #[clap(long, default_value = "ripe")] - source: String, - - /// RPKIviews collector: soborost, massars, attn, kerfuffle (default: soborost) - #[clap(long, default_value = "soborost")] - collector: String, - }, - - /// list ASPAs from RPKI data (current or historical via bgpkit-commons) - Aspas { - /// Filter by customer ASN - #[clap(long)] - customer: Option, - - /// Filter by provider ASN - #[clap(long)] - provider: Option, - - /// Load historical data for this date (YYYY-MM-DD) - #[clap(long)] - date: Option, - - /// Historical data source: ripe, rpkiviews (default: ripe) - #[clap(long, default_value = "ripe")] - source: String, - - /// RPKIviews collector: soborost, massars, attn, kerfuffle (default: soborost) - #[clap(long, default_value = "soborost")] - collector: String, - }, -} - -#[derive(Subcommand)] -enum RadarCommands { - /// get routing stats - Stats { - /// a two-letter country code or asn number (e.g., US or 13335) - #[clap(name = "QUERY")] - query: Option, - }, - - /// look up prefix-to-origin mapping on the most recent global routing table snapshot - Pfx2as { - /// an IP prefix or an AS number (e.g., 1.1.1.0/24 or 13335) - #[clap(name = "QUERY")] - query: String, - - /// filter by RPKI validation status, valid, invalid, or unknown - #[clap(short, long)] - rpki_status: Option, - }, -} - -fn elem_to_string( +pub(crate) fn elem_to_string( elem: &BgpElem, json: bool, pretty: bool, @@ -420,1351 +126,15 @@ fn main() { // You can check for the existence of subcommands, and if found, use their // matches just as you would the top level cmd match cli.command { - Commands::Parse { - file_path, - pretty, - mrt_path, - filters, - } => { - if let Err(e) = filters.validate() { - eprintln!("ERROR: {e}"); - return; - } - - let file_path = match file_path.to_str() { - Some(path) => path, - None => { - eprintln!("Invalid file path"); - std::process::exit(1); - } - }; - let parser = match filters.to_parser(file_path) { - Ok(p) => p, - Err(e) => { - eprintln!("Failed to create parser for {}: {}", file_path, e); - std::process::exit(1); - } - }; - - let mut stdout = std::io::stdout(); - - match mrt_path { - None => { - for elem in parser { - // output to stdout - let output_str = match elem_to_string(&elem, json, pretty, "") { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to format element: {}", e); - continue; - } - }; - if let Err(e) = writeln!(stdout, "{}", &output_str) { - if e.kind() != std::io::ErrorKind::BrokenPipe { - eprintln!("ERROR: {e}"); - } - std::process::exit(1); - } - } - } - Some(p) => { - let path = match p.to_str() { - Some(path) => path.to_string(), - None => { - eprintln!("Invalid MRT path"); - std::process::exit(1); - } - }; - println!("processing. filtered messages output to {}...", &path); - let mut encoder = MrtUpdatesEncoder::new(); - let mut writer = match oneio::get_writer(&path) { - Ok(w) => w, - Err(e) => { - eprintln!("ERROR: {e}"); - std::process::exit(1); - } - }; - let mut total_count = 0; - for elem in parser { - total_count += 1; - encoder.process_elem(&elem); - } - if let Err(e) = writer.write_all(&encoder.export_bytes()) { - eprintln!("Failed to write MRT data: {}", e); - } - drop(writer); - println!("done. total of {} message wrote", total_count); - } - } - } - Commands::Search { - dry_run, - pretty, - mrt_path, - sqlite_path, - sqlite_reset, - filters, - } => { - if let Err(e) = filters.validate() { - eprintln!("ERROR: {e}"); - return; - } - - let mut sqlite_path_str = "".to_string(); - let sqlite_db = sqlite_path.and_then(|p| { - p.to_str().map(|s| { - sqlite_path_str = s.to_string(); - match MsgStore::new(&Some(sqlite_path_str.clone()), sqlite_reset) { - Ok(store) => store, - Err(e) => { - eprintln!("Failed to create SQLite store: {}", e); - std::process::exit(1); - } - } - }) - }); - let mrt_path = mrt_path.and_then(|p| p.to_str().map(|s| s.to_string())); - let show_progress = sqlite_db.is_some() || mrt_path.is_some(); - - // Create base broker for pagination - let base_broker = match filters.build_broker() { - Ok(broker) => broker, - Err(e) => { - eprintln!("Failed to create broker: {}", e); - std::process::exit(1); - } - }; - - if dry_run { - // For dry run, get first page to show what would be processed - let items = match base_broker.clone().page(1).query_single_page() { - Ok(items) => items, - Err(e) => { - eprintln!("Failed to query broker for dry run: {}", e); - std::process::exit(1); - } - }; - - let total_size: i64 = items.iter().map(|x| x.rough_size).sum(); - println!( - "First page: {} files, {} bytes (will process all pages with ~1000 files each)", - items.len(), - total_size - ); - return; - } - - let (sender, receiver): (Sender, Receiver) = channel(); - // Single progress channel for all updates - let (progress_sender, progress_receiver): ( - Sender, - Receiver, - ) = channel(); - - // dedicated thread for handling output of results - let writer_thread = thread::spawn(move || { - let display_stdout = sqlite_db.is_none() && mrt_path.is_none(); - let mut mrt_writer = match mrt_path { - Some(p) => match oneio::get_writer(p.as_str()) { - Ok(writer) => Some((MrtUpdatesEncoder::new(), writer)), - Err(e) => { - eprintln!("Failed to create MRT writer: {}", e); - None - } - }, - None => None, - }; - - let mut current_file_cache = vec![]; - let mut total_msg_count = 0; - - for msg in receiver { - match msg { - WriterMessage::Element(elem, collector) => { - total_msg_count += 1; - - if display_stdout { - let output_str = - match elem_to_string(&elem, json, pretty, collector.as_str()) { - Ok(s) => s, - Err(e) => { - eprintln!("Failed to format element: {}", e); - continue; - } - }; - println!("{output_str}"); - continue; - } - - current_file_cache.push((*elem, collector)); - } - WriterMessage::FileComplete => { - // Commit current file's data to SQLite - if !current_file_cache.is_empty() { - if let Some(db) = &sqlite_db { - if let Err(e) = db.insert_elems(¤t_file_cache) { - eprintln!("Failed to insert elements to database: {}", e); - } - } - if let Some((encoder, _writer)) = &mut mrt_writer { - for (elem, _) in ¤t_file_cache { - encoder.process_elem(elem); - } - } - current_file_cache.clear(); - } - } - } - } - - // Handle any remaining data in cache (in case last file didn't send FileComplete) - if !current_file_cache.is_empty() { - if let Some(db) = &sqlite_db { - if let Err(e) = db.insert_elems(¤t_file_cache) { - eprintln!("Failed to insert elements to database: {}", e); - } - } - if let Some((encoder, _writer)) = &mut mrt_writer { - for (elem, _) in ¤t_file_cache { - encoder.process_elem(elem); - } - } - } - - if let Some((encoder, writer)) = &mut mrt_writer { - let bytes = encoder.export_bytes(); - if let Err(e) = writer.write_all(&bytes) { - eprintln!("Failed to write MRT data: {}", e); - } - } - drop(mrt_writer); - - if !display_stdout { - println!( - "found {total_msg_count} messages, written into file {sqlite_path_str}" - ); - } - }); - - // Setup spinner for paginated processing - let pb = if show_progress { - let pb = indicatif::ProgressBar::new_spinner(); - pb.set_message("Processed 0 files, found 0 messages"); - pb.enable_steady_tick(Duration::from_millis(100)); - Some(pb) - } else { - None - }; - - // Simplified progress thread with single channel - let pb_for_updates = pb.clone(); - let progress_thread = thread::spawn(move || { - let mut files_processed: u64 = 0; - let mut total_messages: u64 = 0; - let mut succeeded_files: u64 = 0; - let mut failed_files: u64 = 0; - let mut current_page: i64 = 1; - let mut current_timestamp = String::new(); - - for update in progress_receiver { - match update { - ProgressUpdate::FileComplete { - message_count, - success, - } => { - files_processed += 1; - total_messages += message_count as u64; - if success { - succeeded_files += 1; - } else { - failed_files += 1; - } - } - ProgressUpdate::PageStarted { - page_num, - timestamp, - } => { - current_page = page_num; - current_timestamp = timestamp; - } - } - - // Update progress display - if let Some(ref pb) = pb_for_updates { - let page_info = if current_timestamp.is_empty() { - format!( - " | Page {} (succeeded: {}, failed: {})", - current_page, succeeded_files, failed_files - ) - } else { - format!( - " | Page {} (succeeded: {}, failed: {}) {}", - current_page, succeeded_files, failed_files, current_timestamp - ) - }; - - pb.set_message(format!( - "Processed {} files, found {} messages{}", - files_processed, total_messages, page_info - )); - } - } - }); - - // Create shared structure to collect failed items - let failed_items = Arc::new(Mutex::new(Vec::::new())); - let failed_items_clone = Arc::clone(&failed_items); - - // Paginated processing loop - let mut page = 1i64; - - loop { - let items = match base_broker.clone().page(page).query_single_page() { - Ok(items) => items, - Err(e) => { - eprintln!("Failed to fetch page {}: {}", page, e); - break; - } - }; - - if items.is_empty() { - info!("Reached empty page {}, finishing", page); - break; - } - - let page_size = items.len(); - - // Send page started update to progress thread - let time_info = if let Some(first_item) = items.first() { - format!("@ {}", first_item.ts_start.format("%Y-%m-%d %H:%M UTC")) - } else { - String::new() - }; - - if progress_sender - .send(ProgressUpdate::PageStarted { - page_num: page, - timestamp: time_info.clone(), - }) - .is_err() - { - // Progress thread may have ended, continue - } - - if !show_progress { - info!("Starting page {} ({} files){}", page, page_size, time_info); - info!("Processing page {} with {} items", page, page_size); - } - - // Process this page's items using existing parallel logic - let progress_sender_clone = progress_sender.clone(); - - items.into_par_iter().for_each_with( - ( - sender.clone(), - progress_sender_clone, - failed_items_clone.clone(), - ), - |(s, progress_sender, failed_items), item| { - let url = item.url.clone(); - let collector = item.collector_id.clone(); - - if !show_progress { - info!("start parsing {}", url.as_str()); - } - - let parser = match filters.to_parser(url.as_str()) { - Ok(p) => p, - Err(e) => { - let error_msg = format!("Failed to parse {}: {}", url.as_str(), e); - if !show_progress { - eprintln!("{}", error_msg); - } - // Store failed item for retry - if let Ok(mut failed) = failed_items.lock() { - failed.push(FailedItem::new(item, error_msg)); - } - // Send failure progress update - if progress_sender - .send(ProgressUpdate::FileComplete { - message_count: 0, - success: false, - }) - .is_err() - { - // Progress thread may have ended, ignore - } - return; - } - }; - - let mut elems_count = 0; - for elem in parser { - if s.send(WriterMessage::Element(Box::new(elem), collector.clone())) - .is_err() - { - // Channel closed, break out - break; - } - elems_count += 1; - } - - // Send file completion signal to trigger per-file commit - if s.send(WriterMessage::FileComplete).is_err() { - // Channel closed, ignore - } - - // Send success progress update - if progress_sender - .send(ProgressUpdate::FileComplete { - message_count: elems_count, - success: true, - }) - .is_err() - { - // Progress thread may have ended, ignore - } - - if !show_progress { - info!("finished parsing {}", url.as_str()); - } - }, - ); - - // Page processing complete - no need to update counters as they're updated in real-time - - page += 1; - - // Early exit if partial page (last page) - if page_size < 1000 { - info!("Processed final page {} with {} items", page - 1, page_size); - break; - } - } - - if let Some(pb) = pb { - let final_message = format!("Completed {} pages", page - 1); - pb.finish_with_message(final_message); - } - - if !show_progress { - info!("Completed processing across {} pages", page - 1); - } - - // Retry phase for failed items - let failed_count = { - match failed_items.lock() { - Ok(failed) => failed.len(), - Err(e) => { - warn!("Failed to lock failed_items mutex: {}", e); - 0 - } - } - }; - - if failed_count > 0 { - if !show_progress { - info!("Starting retry phase for {} failed items", failed_count); - } - - // Process retries sequentially to avoid overwhelming servers - let mut retry_queue = { - match failed_items.lock() { - Ok(failed) => failed.clone(), - Err(e) => { - warn!("Failed to lock failed_items mutex for retry: {}", e); - vec![] - } - } - }; - - let mut retry_stats = HashMap::new(); - let mut total_retries = 0; - let mut successful_retries = 0; - - while !retry_queue.is_empty() { - let mut new_failures = Vec::new(); - - for mut failed_item in retry_queue { - if !failed_item.should_retry() { - // Max retries reached - *retry_stats.entry("max_retries_reached").or_insert(0) += 1; - continue; - } - - let delay = failed_item.next_delay(); - if !show_progress { - info!( - "Retrying {} (attempt {}/{}) after {}s delay", - failed_item.item.url.as_str(), - failed_item.attempt_count + 1, - MAX_RETRIES, - delay.as_secs() - ); - } - - thread::sleep(delay); - total_retries += 1; - - let parser = match filters.to_parser(failed_item.item.url.as_str()) { - Ok(p) => p, - Err(e) => { - let error_msg = format!( - "Retry failed to parse {}: {}", - failed_item.item.url.as_str(), - e - ); - if !show_progress { - warn!("{}", error_msg); - } - failed_item.increment_attempt(error_msg); - new_failures.push(failed_item); - continue; - } - }; - - let mut elems_count = 0; - let mut parse_successful = true; - - for elem in parser { - if sender - .send(WriterMessage::Element( - Box::new(elem), - failed_item.item.collector_id.clone(), - )) - .is_err() - { - // Channel closed, mark as failed - parse_successful = false; - break; - } - elems_count += 1; - } - - // Send file completion signal for retry as well - if parse_successful && sender.send(WriterMessage::FileComplete).is_err() { - parse_successful = false; - } - - if parse_successful { - successful_retries += 1; - // Retry successful - progress already tracked by main processing - if !show_progress { - info!( - "Successfully retried {} (found {} messages)", - failed_item.item.url.as_str(), - elems_count - ); - } - } else { - let error_msg = - "Retry failed: channel closed during processing".to_string(); - failed_item.increment_attempt(error_msg); - new_failures.push(failed_item); - } - } - - retry_queue = new_failures; - } - - // Log retry statistics - let final_failures = retry_queue.len(); - if !show_progress { - info!( - "Retry phase completed: {} total retry attempts, {} successful, {} final failures", - total_retries, successful_retries, final_failures - ); - - if final_failures > 0 { - warn!( - "Warning: {} files could not be processed after {} retry attempts", - final_failures, MAX_RETRIES - ); - } - } - } - - // Close channels to signal completion - drop(sender); - drop(progress_sender); - - // wait for the output thread to stop - if let Err(e) = writer_thread.join() { - eprintln!("Writer thread failed: {:?}", e); - } - if let Err(e) = progress_thread.join() { - eprintln!("Progress thread failed: {:?}", e); - } - } - - Commands::Broker { - start_ts, - end_ts, - collector, - project, - data_type, - page, - page_size, - } => { - // parse time strings similar to Search subcommand - let ts_start = match string_to_time(&start_ts) { - Ok(t) => t.timestamp(), - Err(_) => { - eprintln!("start-ts is not a valid time string: {}", start_ts); - std::process::exit(1); - } - }; - let ts_end = match string_to_time(&end_ts) { - Ok(t) => t.timestamp(), - Err(_) => { - eprintln!("end-ts is not a valid time string: {}", end_ts); - std::process::exit(1); - } - }; - - let mut broker = bgpkit_broker::BgpkitBroker::new() - .ts_start(ts_start) - .ts_end(ts_end); - - if let Some(c) = collector { - broker = broker.collector_id(c.as_str()); - } - if let Some(p) = project { - broker = broker.project(p.as_str()); - } - if let Some(dt) = data_type { - broker = broker.data_type(dt.as_str()); - } - - let page_size = page_size.unwrap_or(1000); - broker = broker.page_size(page_size); - - let res = if let Some(p) = page { - broker.page(p).query_single_page() - } else { - // Use query() and limit to at most 10 pages worth of items - match broker.query() { - Ok(mut v) => { - let max_items = (page_size * 10) as usize; - if v.len() > max_items { - v.truncate(max_items); - } - Ok(v) - } - Err(e) => Err(e), - } - }; - - match res { - Ok(items) => { - if items.is_empty() { - println!("No MRT files found"); - return; - } - - if json { - match serde_json::to_string_pretty(&items) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("error serializing: {}", e), - } - } else { - #[derive(Tabled)] - struct BrokerItemDisplay { - #[tabled(rename = "Collector")] - collector_id: String, - #[tabled(rename = "Type")] - data_type: String, - #[tabled(rename = "Start Time (UTC)")] - ts_start: String, - #[tabled(rename = "URL")] - url: String, - #[tabled(rename = "Size (Bytes)")] - rough_size: i64, - } - - let display_items: Vec = items - .into_iter() - .map(|item| BrokerItemDisplay { - collector_id: item.collector_id, - data_type: item.data_type, - ts_start: item.ts_start.format("%Y-%m-%dT%H:%M:%SZ").to_string(), - url: item.url, - rough_size: item.rough_size, - }) - .collect(); - - println!("{}", Table::new(display_items).with(Style::markdown())); - } - } - Err(e) => { - eprintln!("failed to query: {}", e); - } - } - } - - Commands::Whois { - query, - name_only, - asn_only, - update, - pretty, - full_table, - full_country, - country_only, - psv, - } => { - let data_dir = config.data_dir.as_str(); - let as2org = match As2org::new(&Some(format!("{data_dir}/monocle-data.sqlite3"))) { - Ok(as2org) => as2org, - Err(e) => { - eprintln!("Failed to create AS2org database: {}", e); - std::process::exit(1); - } - }; - - if update { - // if the update flag is set, clear existing as2org data and re-download later - if let Err(e) = as2org.clear_db() { - eprintln!("Failed to clear database: {}", e); - std::process::exit(1); - } - } - - if as2org.is_db_empty() { - println!("bootstrapping as2org data now... (it will take about one minute)"); - if let Err(e) = as2org.parse_insert_as2org(None) { - eprintln!("Failed to bootstrap AS2org data: {}", e); - std::process::exit(1); - } - println!("bootstrapping as2org data finished"); - } - - let mut search_type: SearchType = match (name_only, asn_only) { - (true, false) => SearchType::NameOnly, - (false, true) => SearchType::AsnOnly, - (false, false) => SearchType::Guess, - (true, true) => { - eprintln!("ERROR: name-only and asn-only cannot be both true"); - return; - } - }; - - if country_only { - search_type = SearchType::CountryOnly; - } - - let mut res = query - .into_iter() - .flat_map( - |q| match as2org.search(q.as_str(), &search_type, full_country) { - Ok(results) => results, - Err(e) => { - eprintln!("Search error for '{}': {}", q, e); - Vec::new() - } - }, - ) - .collect::>(); - - // order search results by AS number - res.sort_by_key(|v| v.asn); - - match full_table { - false => { - let res_concise = res.into_iter().map(|x: SearchResult| SearchResultConcise { - asn: x.asn, - as_name: x.as_name, - org_name: x.org_name, - org_country: x.org_country, - }); - if psv { - println!("asn|asn_name|org_name|org_country"); - for res in res_concise { - println!( - "{}|{}|{}|{}", - res.asn, res.as_name, res.org_name, res.org_country - ); - } - return; - } - - match pretty { - true => { - println!("{}", Table::new(res_concise).with(Style::rounded())); - } - false => { - println!("{}", Table::new(res_concise).with(Style::markdown())); - } - }; - } - true => { - if psv { - println!("asn|asn_name|org_name|org_id|org_country|org_size"); - for entry in res { - println!( - "{}|{}|{}|{}|{}|{}", - entry.asn, - entry.as_name, - entry.org_name, - entry.org_id, - entry.org_country, - entry.org_size - ); - } - return; - } - match pretty { - true => { - println!("{}", Table::new(res).with(Style::rounded())); - } - false => { - println!("{}", Table::new(res).with(Style::markdown())); - } - }; - } - } - } - Commands::Time { time, simple } => { - let timestring_res = match simple { - true => parse_time_string_to_rfc3339(&time), - false => time_to_table(&time), - }; - match timestring_res { - Ok(t) => { - println!("{t}") - } - Err(e) => { - eprintln!("ERROR: {e}") - } - }; - } - Commands::Country { queries } => { - let lookup = CountryLookup::new(); - let res: Vec = queries - .into_iter() - .flat_map(|query| lookup.lookup(query.as_str())) - .collect(); - println!("{}", Table::new(res).with(Style::rounded())); - } - Commands::Rpki { commands } => match commands { - RpkiCommands::Check { asn, prefix } => { - let (validity, roas) = match validate(asn, prefix.as_str()) { - Ok((v1, v2)) => (v1, v2), - Err(e) => { - eprintln!("ERROR: unable to check RPKI validity: {}", e); - return; - } - }; - if json { - let roa_items: Vec = - roas.into_iter().map(RoaTableItem::from).collect(); - let output = json!({ - "validation": validity, - "covering_roas": roa_items - }); - println!("{}", output); - } else { - println!("RPKI validation result:"); - println!("{}", Table::new(vec![validity]).with(Style::markdown())); - println!(); - println!("Covering prefixes:"); - println!( - "{}", - Table::new( - roas.into_iter() - .map(RoaTableItem::from) - .collect::>() - ) - .with(Style::markdown()) - ); - } - } - RpkiCommands::List { resource } => { - let resources = match resource.parse::() { - Ok(asn) => match list_by_asn(asn) { - Ok(resources) => resources, - Err(e) => { - eprintln!("Failed to list ROAs for ASN {}: {}", asn, e); - return; - } - }, - Err(_) => match resource.parse::() { - Ok(prefix) => match list_by_prefix(&prefix) { - Ok(resources) => resources, - Err(e) => { - eprintln!("Failed to list ROAs for prefix {}: {}", prefix, e); - return; - } - }, - Err(_) => { - eprintln!( - "ERROR: list resource not an AS number or a prefix: {}", - resource - ); - return; - } - }, - }; - - let roas: Vec = resources - .into_iter() - .flat_map(Into::>::into) - .collect(); - if json { - match serde_json::to_string(&roas) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), - } - } else if roas.is_empty() { - println!("no matching ROAS found for {}", resource); - } else { - println!("{}", Table::new(roas).with(Style::markdown())); - } - } - RpkiCommands::Summary { asns } => { - let res: Vec = asns - .into_iter() - .filter_map(|v| match summarize_asn(v) { - Ok(summary) => Some(summary), - Err(e) => { - eprintln!("Failed to summarize ASN {}: {}", v, e); - None - } - }) - .collect(); - - if json { - match serde_json::to_string(&res) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), - } - } else { - println!("{}", Table::new(res).with(Style::markdown())); - } - } - RpkiCommands::Roas { - origin, - prefix, - date, - source, - collector, - } => { - // Parse date if provided - let parsed_date = match &date { - Some(d) => match NaiveDate::parse_from_str(d, "%Y-%m-%d") { - Ok(date) => Some(date), - Err(e) => { - eprintln!("ERROR: Invalid date format '{}': {}. Use YYYY-MM-DD", d, e); - return; - } - }, - None => None, - }; - - // Load RPKI data - let commons = match load_rpki_data( - parsed_date, - Some(source.as_str()), - Some(collector.as_str()), - ) { - Ok(c) => c, - Err(e) => { - eprintln!("ERROR: Failed to load RPKI data: {}", e); - return; - } - }; - - // Get ROAs with filters - let roas = match get_roas(&commons, prefix.as_deref(), origin) { - Ok(r) => r, - Err(e) => { - eprintln!("ERROR: Failed to get ROAs: {}", e); - return; - } - }; - - if json { - match serde_json::to_string(&roas) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), - } - } else if roas.is_empty() { - println!("No ROAs found matching the criteria"); - } else { - println!( - "Found {} ROAs{}", - roas.len(), - match &date { - Some(d) => format!(" (historical data from {})", d), - None => " (current data)".to_string(), - } - ); - println!("{}", Table::new(roas).with(Style::markdown())); - } - } - RpkiCommands::Aspas { - customer, - provider, - date, - source, - collector, - } => { - // Parse date if provided - let parsed_date = match &date { - Some(d) => match NaiveDate::parse_from_str(d, "%Y-%m-%d") { - Ok(date) => Some(date), - Err(e) => { - eprintln!("ERROR: Invalid date format '{}': {}. Use YYYY-MM-DD", d, e); - return; - } - }, - None => None, - }; - - // Load RPKI data - let commons = match load_rpki_data( - parsed_date, - Some(source.as_str()), - Some(collector.as_str()), - ) { - Ok(c) => c, - Err(e) => { - eprintln!("ERROR: Failed to load RPKI data: {}", e); - return; - } - }; - - // Get ASPAs with filters - let aspas = match get_aspas(&commons, customer, provider) { - Ok(a) => a, - Err(e) => { - eprintln!("ERROR: Failed to get ASPAs: {}", e); - return; - } - }; - - if json { - match serde_json::to_string(&aspas) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("ERROR: Failed to serialize to JSON: {}", e), - } - } else if aspas.is_empty() { - println!("No ASPAs found matching the criteria"); - } else { - println!( - "Found {} ASPAs{}", - aspas.len(), - match &date { - Some(d) => format!(" (historical data from {})", d), - None => " (current data)".to_string(), - } - ); - let table_entries: Vec = - aspas.iter().map(AspaTableEntry::from).collect(); - println!( - "{}", - Table::new(table_entries) - .with(Style::markdown()) - .modify(Columns::last(), Width::wrap(60).keep_words(true)) - ); - } - } - }, - Commands::Radar { commands } => { - let client = match RadarClient::new() { - Ok(client) => client, - Err(e) => { - eprintln!("Failed to create Radar client: {}", e); - std::process::exit(1); - } - }; - - match commands { - RadarCommands::Stats { query } => { - let (country, asn) = match query { - None => (None, None), - Some(q) => match q.parse::() { - Ok(asn) => (None, Some(asn)), - Err(_) => (Some(q), None), - }, - }; - - let res = match client.get_bgp_routing_stats(asn, country.clone()) { - Ok(res) => res, - Err(e) => { - eprintln!("ERROR: unable to get routing stats: {}", e); - return; - } - }; - - let scope = match (country, &asn) { - (None, None) => "global".to_string(), - (Some(c), None) => c, - (None, Some(asn)) => format!("as{}", asn), - (Some(_), Some(_)) => { - eprintln!("ERROR: cannot specify both country and ASN"); - return; - } - }; - - #[derive(Tabled, Serialize)] - struct Stats { - pub scope: String, - pub origins: u32, - pub prefixes: u32, - pub rpki_valid: String, - pub rpki_invalid: String, - pub rpki_unknown: String, - } - let table_data = vec![ - Stats { - scope: scope.clone(), - origins: res.stats.distinct_origins, - prefixes: res.stats.distinct_prefixes, - rpki_valid: format!( - "{} ({:.2}%)", - res.stats.routes_valid, - (res.stats.routes_valid as f64 / res.stats.routes_total as f64) - * 100.0 - ), - rpki_invalid: format!( - "{} ({:.2}%)", - res.stats.routes_invalid, - (res.stats.routes_invalid as f64 / res.stats.routes_total as f64) - * 100.0 - ), - rpki_unknown: format!( - "{} ({:.2}%)", - res.stats.routes_unknown, - (res.stats.routes_unknown as f64 / res.stats.routes_total as f64) - * 100.0 - ), - }, - Stats { - scope: format!("{} ipv4", scope), - origins: res.stats.distinct_origins_ipv4, - prefixes: res.stats.distinct_prefixes_ipv4, - rpki_valid: format!( - "{} ({:.2}%)", - res.stats.routes_valid_ipv4, - (res.stats.routes_valid_ipv4 as f64 - / res.stats.routes_total_ipv4 as f64) - * 100.0 - ), - rpki_invalid: format!( - "{} ({:.2}%)", - res.stats.routes_invalid_ipv4, - (res.stats.routes_invalid_ipv4 as f64 - / res.stats.routes_total_ipv4 as f64) - * 100.0 - ), - rpki_unknown: format!( - "{} ({:.2}%)", - res.stats.routes_unknown_ipv4, - (res.stats.routes_unknown_ipv4 as f64 - / res.stats.routes_total_ipv4 as f64) - * 100.0 - ), - }, - Stats { - scope: format!("{} ipv6", scope), - origins: res.stats.distinct_origins_ipv6, - prefixes: res.stats.distinct_prefixes_ipv6, - rpki_valid: format!( - "{} ({:.2}%)", - res.stats.routes_valid_ipv6, - (res.stats.routes_valid_ipv6 as f64 - / res.stats.routes_total_ipv6 as f64) - * 100.0 - ), - rpki_invalid: format!( - "{} ({:.2}%)", - res.stats.routes_invalid_ipv6, - (res.stats.routes_invalid_ipv6 as f64 - / res.stats.routes_total_ipv6 as f64) - * 100.0 - ), - rpki_unknown: format!( - "{} ({:.2}%)", - res.stats.routes_unknown_ipv6, - (res.stats.routes_unknown_ipv6 as f64 - / res.stats.routes_total_ipv6 as f64) - * 100.0 - ), - }, - ]; - if json { - match serde_json::to_string_pretty(&table_data) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("Failed to serialize JSON: {}", e), - } - } else { - println!("{}", Table::new(table_data).with(Style::modern())); - println!("\nData generated at {} UTC.", res.meta.data_time); - } - } - RadarCommands::Pfx2as { query, rpki_status } => { - let (asn, prefix) = match query.parse::() { - Ok(asn) => (Some(asn), None), - Err(_) => (None, Some(query)), - }; - - let rpki = if let Some(rpki_status) = rpki_status { - match rpki_status.to_lowercase().as_str() { - "valid" | "invalid" | "unknown" => Some(rpki_status), - _ => { - eprintln!("ERROR: invalid rpki status: {}", rpki_status); - return; - } - } - } else { - None - }; - - let res = match client.get_bgp_prefix_origins(asn, prefix, rpki) { - Ok(res) => res, - Err(e) => { - eprintln!("ERROR: unable to get prefix origins: {}", e); - return; - } - }; - - #[derive(Tabled, Serialize)] - struct Pfx2origin { - pub prefix: String, - pub origin: String, - pub rpki: String, - pub visibility: String, - } - - if res.prefix_origins.is_empty() { - println!("no prefix origins found for the given query"); - return; - } - - fn count_to_visibility(count: u32, total: u32) -> String { - let ratio = count as f64 / total as f64; - if ratio > 0.8 { - format!("high ({:.2}%)", ratio * 100.0) - } else if ratio < 0.2 { - format!("low ({:.2}%)", ratio * 100.0) - } else { - format!("mid ({:.2}%)", ratio * 100.0) - } - } - - let table_data = res - .prefix_origins - .into_iter() - .map(|entry| Pfx2origin { - prefix: entry.prefix, - origin: format!("as{}", entry.origin), - rpki: entry.rpki_validation.to_lowercase(), - visibility: count_to_visibility( - entry.peer_count as u32, - res.meta.total_peers as u32, - ), - }) - .collect::>(); - if json { - match serde_json::to_string_pretty(&table_data) { - Ok(json_str) => println!("{}", json_str), - Err(e) => eprintln!("Error serializing data to JSON: {}", e), - } - } else { - println!("{}", Table::new(table_data).with(Style::modern())); - println!("\nData generated at {} UTC.", res.meta.data_time); - } - } - } - } - Commands::Ip { ip, simple } => match fetch_ip_info(ip, simple) { - Ok(ipinfo) => { - if simple { - println!("{}", ipinfo.ip); - return; - } - - let json_value = json!(&ipinfo); - if json { - if let Err(e) = serde_json::to_writer_pretty(std::io::stdout(), &json_value) { - eprintln!("Error writing JSON to stdout: {}", e); - } - } else { - let mut table = json_to_table(&json_value); - table.collapse(); - println!("{}", table); - } - } - Err(e) => { - eprintln!("ERROR: unable to get ip information: {e}"); - } - }, - Commands::Pfx2as { - data_file_path, - input, - exact_match, - } => { - let pfx2as = match Pfx2as::new(Some(data_file_path)) { - Ok(v) => v, - Err(e) => { - eprintln!("ERROR: unable to open data file: {}", e); - std::process::exit(1); - } - }; - - // collect all prefixes to look up - let mut prefixes: Vec = vec![]; - for i in input { - match i.parse::() { - Ok(p) => prefixes.push(p), - Err(_) => { - // it might be a data file - if let Ok(lines) = oneio::read_lines(i.as_str()) { - for line in lines.map_while(Result::ok) { - if line.starts_with('#') { - continue; - } - let trimmed = - line.trim().split(',').next().unwrap_or(line.as_str()); - if let Ok(p) = trimmed.parse::() { - prefixes.push(p); - } - } - } - } - } - } - - // map prefix to origins. one prefix may be mapped to multiple origins - prefixes.sort(); - let mut prefix_origins_map: HashMap> = HashMap::new(); - for p in prefixes { - let origins = match exact_match { - true => pfx2as.lookup_exact(p), - false => pfx2as.lookup_longest(p), - }; - prefix_origins_map.entry(p).or_default().extend(origins); - } - - // display - if json { - // map prefix_origin_pairs to a vector of JSON objects each with a - // "prefix" and "origin" field - let data = prefix_origins_map - .iter() - .map(|(p, o)| json!({"prefix": p.to_string(), "origins": o.iter().cloned().collect::>()})) - .collect::>(); - if let Err(e) = serde_json::to_writer_pretty(std::io::stdout(), &data) { - eprintln!("Error writing JSON to stdout: {}", e); - } - } else { - for (prefix, origins) in prefix_origins_map { - let mut origins_vec = origins.iter().cloned().collect::>(); - origins_vec.sort(); - println!("{},{}", prefix, origins.iter().join(",")); - } - } - } + Commands::Parse(args) => commands::parse::run(args, json), + Commands::Search(args) => commands::search::run(args, json), + Commands::Broker(args) => commands::broker::run(args, json), + Commands::Whois(args) => commands::whois::run(&config, args), + Commands::Time(args) => commands::time::run(args), + Commands::Country(args) => commands::country::run(args), + Commands::Rpki { commands } => commands::rpki::run(commands, json), + Commands::Radar { commands } => commands::radar::run(commands, json), + Commands::Ip(args) => commands::ip::run(args, json), + Commands::Pfx2as(args) => commands::pfx2as::run(args, json), } }