diff --git a/agent/crates/enterprise-utils/src/lib.rs b/agent/crates/enterprise-utils/src/lib.rs index f3a5af7cbdf..462f214f9bc 100644 --- a/agent/crates/enterprise-utils/src/lib.rs +++ b/agent/crates/enterprise-utils/src/lib.rs @@ -436,3 +436,83 @@ pub mod rpc { } } } + +pub mod ai_agent { + use std::sync::Arc; + use std::time::Duration; + + #[derive(Debug, Clone, Default)] + pub struct AgentMeta { + pub first_seen: Duration, + pub last_seen: Duration, + pub matched_endpoint: String, + } + + #[derive(Debug, Clone, Default)] + pub struct AiAgentRegistry; + + impl AiAgentRegistry { + pub fn new() -> Self { + AiAgentRegistry + } + + pub fn register(&self, _pid: u32, _endpoint: &str, _now: Duration) -> bool { + false + } + + pub fn is_ai_agent(&self, _pid: u32) -> bool { + false + } + + pub fn get_all_pids(&self) -> Vec { + vec![] + } + + pub fn cleanup_dead_pids(&self, _alive_pids: &[u32]) -> Vec { + vec![] + } + + pub fn len(&self) -> usize { + 0 + } + + pub fn is_empty(&self) -> bool { + true + } + + pub fn sync_bpf_map_add(&self, _pid: u32) {} + + pub fn sync_bpf_map_remove(&self, _pid: u32) {} + + #[cfg(target_os = "linux")] + pub fn set_bpf_map_fd(&self, _fd: i32) {} + + pub fn set_file_io_enabled(&self, _enabled: bool) {} + + pub fn record_endpoint_hit(&self, _pid: u32, _endpoint: &str, _now: Duration) -> bool { + false + } + } + + /// Check if a URL path matches an AI Agent endpoint pattern. + pub fn match_ai_agent_endpoint( + _endpoints: &[String], + _path: &str, + _pid: u32, + _now: Duration, + ) -> Option { + None + } + + /// Initialize the global AI Agent registry. Returns the registry Arc. + /// Stub: returns a no-op registry. + pub fn init_global_registry() -> Arc { + Arc::new(AiAgentRegistry::new()) + } + + /// Get a reference to the global AI Agent registry. + /// Stub: always returns None. + pub fn global_registry() -> Option<&'static Arc> { + None + } +} diff --git a/agent/src/common/ebpf.rs b/agent/src/common/ebpf.rs index 47c6ea5af80..06ede88e067 100644 --- a/agent/src/common/ebpf.rs +++ b/agent/src/common/ebpf.rs @@ -40,6 +40,10 @@ pub const GO_HTTP2_UPROBE_DATA: u8 = 5; pub const SOCKET_CLOSE_EVENT: u8 = 6; // unix socket pub const UNIX_SOCKET: u8 = 8; +// AI Agent governance event types +pub const FILE_OP_EVENT: u8 = 9; +pub const PERM_OP_EVENT: u8 = 10; +pub const PROC_LIFECYCLE_EVENT: u8 = 11; const EBPF_TYPE_TRACEPOINT: u8 = 0; const EBPF_TYPE_TLS_UPROBE: u8 = 1; diff --git a/agent/src/common/flow.rs b/agent/src/common/flow.rs index 7104dd1b25b..585f00d948e 100644 --- a/agent/src/common/flow.rs +++ b/agent/src/common/flow.rs @@ -539,6 +539,10 @@ impl From for flow_log::FlowPerfStats { } } +// Business type constants for process classification +pub const BIZ_TYPE_DEFAULT: u8 = 0; +pub const BIZ_TYPE_AI_AGENT: u8 = 1; + #[derive(Clone, Debug, Default)] pub struct L7Stats { pub stats: L7PerfStats, diff --git a/agent/src/common/l7_protocol_log.rs b/agent/src/common/l7_protocol_log.rs index 507bcb1bc17..a86865e2835 100644 --- a/agent/src/common/l7_protocol_log.rs +++ b/agent/src/common/l7_protocol_log.rs @@ -24,6 +24,8 @@ use std::sync::{ }; use std::time::Duration; +use crate::common::l7_protocol_info::L7ProtocolInfoInterface; + use enum_dispatch::enum_dispatch; use log::debug; use lru::LruCache; @@ -248,6 +250,16 @@ impl L7ParseResult { L7ParseResult::None => panic!("parse result is none but unwrap multi"), } } + + /// Check if any parsed result has the given biz_type. + /// Used to detect AI Agent flows after parsing. + pub fn has_biz_type(&self, biz_type: u8) -> bool { + match self { + L7ParseResult::Single(info) => info.get_biz_type() == biz_type, + L7ParseResult::Multi(infos) => infos.iter().any(|i| i.get_biz_type() == biz_type), + L7ParseResult::None => false, + } + } } #[enum_dispatch] @@ -690,6 +702,8 @@ pub struct ParseParam<'a> { pub oracle_parse_conf: OracleConfig, pub iso8583_parse_conf: Iso8583ParseConfig, pub web_sphere_mq_parse_conf: WebSphereMqParseConfig, + + pub process_id: u32, } impl<'a> fmt::Debug for ParseParam<'a> { @@ -793,6 +807,8 @@ impl<'a> ParseParam<'a> { oracle_parse_conf: OracleConfig::default(), iso8583_parse_conf: Iso8583ParseConfig::default(), web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(), + + process_id: packet.process_id, } } } @@ -811,7 +827,13 @@ impl<'a> ParseParam<'a> { } pub fn set_buf_size(&mut self, buf_size: usize) { - self.buf_size = buf_size as u16; + // Saturate to u16::MAX to avoid overflow when AI Agent flows use larger payload sizes. + // buf_size is informational for plugins; actual payload truncation uses the usize value directly. + self.buf_size = if buf_size > u16::MAX as usize { + u16::MAX + } else { + buf_size as u16 + }; } pub fn set_captured_byte(&mut self, captured_byte: usize) { diff --git a/agent/src/common/proc_event/linux.rs b/agent/src/common/proc_event/linux.rs index 3d6abce0748..8742e439b2c 100644 --- a/agent/src/common/proc_event/linux.rs +++ b/agent/src/common/proc_event/linux.rs @@ -21,17 +21,18 @@ use std::{ use prost::Message; use public::{ - bytes::{read_u32_le, read_u64_le}, + bytes::{read_u16_le, read_u32_le, read_u64_le}, proto::metric, sender::{SendMessageType, Sendable}, }; use crate::common::{ - ebpf::IO_EVENT, + ebpf::{FILE_OP_EVENT, IO_EVENT, PERM_OP_EVENT, PROC_LIFECYCLE_EVENT}, error::Error::{self, ParseEventData}, }; use crate::ebpf::SK_BPF_DATA; +// ── IoEventData offsets (matches user_io_event_buffer) ────────────────── const IO_OPERATION_OFFSET: usize = 4; const IO_LATENCY_OFFSET: usize = 8; const IO_OFF_BYTES_OFFSET: usize = 16; @@ -40,7 +41,11 @@ const IO_FILE_NAME_OFFSET: usize = 28; const IO_MOUNT_SOURCE_OFFSET: usize = 284; const IO_MOUNT_POINT_OFFSET: usize = 796; const IO_FILE_DIR_OFFSET: usize = 1052; -const IO_EVENT_BUFF_SIZE: usize = 1564; +const IO_MNT_ID_OFFSET: usize = 1564; +const IO_MNTNS_ID_OFFSET: usize = 1568; +const IO_ACCESS_PERMISSION_OFFSET: usize = 1572; +const IO_EVENT_BUFF_SIZE: usize = 1574; + struct IoEventData { bytes_count: u32, // Number of bytes read and written operation: u32, // 0: write 1: read @@ -51,6 +56,7 @@ struct IoEventData { mount_source: Vec, mount_point: Vec, file_dir: Vec, + access_permission: u16, // File permission bits (inode->i_mode & 0xFFF) } impl TryFrom<&[u8]> for IoEventData { @@ -66,7 +72,7 @@ impl TryFrom<&[u8]> for IoEventData { let length = raw_data.len(); if length < IO_EVENT_BUFF_SIZE { return Err(ParseEventData(format!( - "parse io event data failed, raw data length: {length} < {IO_OFF_BYTES_OFFSET}" + "parse io event data failed, raw data length: {length} < {IO_EVENT_BUFF_SIZE}" ))); } let io_event_data = Self { @@ -79,6 +85,7 @@ impl TryFrom<&[u8]> for IoEventData { mount_source: parse_cstring_slice(&raw_data[IO_MOUNT_SOURCE_OFFSET..]), mount_point: parse_cstring_slice(&raw_data[IO_MOUNT_POINT_OFFSET..]), file_dir: parse_cstring_slice(&raw_data[IO_FILE_DIR_OFFSET..]), + access_permission: read_u16_le(&raw_data[IO_ACCESS_PERMISSION_OFFSET..]), }; Ok(io_event_data) } @@ -96,13 +103,204 @@ impl From for metric::IoEventData { mount_point: io_event_data.mount_point, file_dir: io_event_data.file_dir, file_type: io_event_data.file_type as i32, + access_permission: io_event_data.access_permission as u32, } } } +// ── FileOpEventData offsets (packed __ai_agent_file_op_event) ─────────── +// Layout: event_type(1) + pid(4) + uid(4) + gid(4) + mode(4) + timestamp(8) + filename(256) +const FILE_OP_MIN_SIZE: usize = 25; // without filename +const FILE_OP_PID_OFF: usize = 1; +const FILE_OP_UID_OFF: usize = 5; +const FILE_OP_GID_OFF: usize = 9; +const FILE_OP_MODE_OFF: usize = 13; +const FILE_OP_TS_OFF: usize = 17; +const FILE_OP_FNAME_OFF: usize = 25; + +struct FileOpEventData { + op_type: u8, + pid: u32, + uid: u32, + gid: u32, + mode: u32, + timestamp: u64, + filename: Vec, +} + +impl TryFrom<&[u8]> for FileOpEventData { + type Error = Error; + + fn try_from(raw: &[u8]) -> Result { + if raw.len() < FILE_OP_MIN_SIZE { + return Err(ParseEventData(format!( + "file_op event too short: {} < {FILE_OP_MIN_SIZE}", + raw.len() + ))); + } + let filename = if raw.len() > FILE_OP_FNAME_OFF { + let slice = &raw[FILE_OP_FNAME_OFF..]; + match slice.iter().position(|&b| b == b'\0') { + Some(i) => slice[..i].to_vec(), + None => slice.to_vec(), + } + } else { + vec![] + }; + Ok(Self { + op_type: raw[0], + pid: read_u32_le(&raw[FILE_OP_PID_OFF..]), + uid: read_u32_le(&raw[FILE_OP_UID_OFF..]), + gid: read_u32_le(&raw[FILE_OP_GID_OFF..]), + mode: read_u32_le(&raw[FILE_OP_MODE_OFF..]), + timestamp: read_u64_le(&raw[FILE_OP_TS_OFF..]), + filename, + }) + } +} + +impl From for metric::FileOpEventData { + fn from(d: FileOpEventData) -> Self { + Self { + op_type: d.op_type as i32, + pid: d.pid, + uid: d.uid, + gid: d.gid, + mode: d.mode, + timestamp: d.timestamp, + filename: d.filename, + } + } +} + +// ── PermOpEventData offsets (packed __ai_agent_perm_event) ────────────── +// Layout: event_type(1) + pid(4) + old_uid(4) + old_gid(4) + new_uid(4) + new_gid(4) + timestamp(8) +const PERM_OP_SIZE: usize = 29; +const PERM_OP_PID_OFF: usize = 1; +const PERM_OP_OLD_UID_OFF: usize = 5; +const PERM_OP_OLD_GID_OFF: usize = 9; +const PERM_OP_NEW_UID_OFF: usize = 13; +const PERM_OP_NEW_GID_OFF: usize = 17; +const PERM_OP_TS_OFF: usize = 21; + +struct PermOpEventData { + op_type: u8, + pid: u32, + old_uid: u32, + old_gid: u32, + new_uid: u32, + new_gid: u32, + timestamp: u64, +} + +impl TryFrom<&[u8]> for PermOpEventData { + type Error = Error; + + fn try_from(raw: &[u8]) -> Result { + if raw.len() < PERM_OP_SIZE { + return Err(ParseEventData(format!( + "perm_op event too short: {} < {PERM_OP_SIZE}", + raw.len() + ))); + } + Ok(Self { + op_type: raw[0], + pid: read_u32_le(&raw[PERM_OP_PID_OFF..]), + old_uid: read_u32_le(&raw[PERM_OP_OLD_UID_OFF..]), + old_gid: read_u32_le(&raw[PERM_OP_OLD_GID_OFF..]), + new_uid: read_u32_le(&raw[PERM_OP_NEW_UID_OFF..]), + new_gid: read_u32_le(&raw[PERM_OP_NEW_GID_OFF..]), + timestamp: read_u64_le(&raw[PERM_OP_TS_OFF..]), + }) + } +} + +impl From for metric::PermOpEventData { + fn from(d: PermOpEventData) -> Self { + Self { + op_type: d.op_type as i32, + pid: d.pid, + old_uid: d.old_uid, + old_gid: d.old_gid, + new_uid: d.new_uid, + new_gid: d.new_gid, + timestamp: d.timestamp, + } + } +} + +// ── ProcLifecycleEventData offsets (packed __ai_agent_proc_event) ─────── +// Layout: event_type(1) + pid(4) + parent_pid(4) + uid(4) + gid(4) + timestamp(8) + comm(16) +const PROC_LIFECYCLE_MIN_SIZE: usize = 25; // without comm +const PROC_LC_PID_OFF: usize = 1; +const PROC_LC_PPID_OFF: usize = 5; +const PROC_LC_UID_OFF: usize = 9; +const PROC_LC_GID_OFF: usize = 13; +const PROC_LC_TS_OFF: usize = 17; +const PROC_LC_COMM_OFF: usize = 25; + +struct ProcLifecycleEventData { + lifecycle_type: u8, + pid: u32, + parent_pid: u32, + uid: u32, + gid: u32, + timestamp: u64, + comm: Vec, +} + +impl TryFrom<&[u8]> for ProcLifecycleEventData { + type Error = Error; + + fn try_from(raw: &[u8]) -> Result { + if raw.len() < PROC_LIFECYCLE_MIN_SIZE { + return Err(ParseEventData(format!( + "proc_lifecycle event too short: {} < {PROC_LIFECYCLE_MIN_SIZE}", + raw.len() + ))); + } + let comm = if raw.len() > PROC_LC_COMM_OFF { + let slice = &raw[PROC_LC_COMM_OFF..]; + match slice.iter().position(|&b| b == b'\0') { + Some(i) => slice[..i].to_vec(), + None => slice.to_vec(), + } + } else { + vec![] + }; + Ok(Self { + lifecycle_type: raw[0], + pid: read_u32_le(&raw[PROC_LC_PID_OFF..]), + parent_pid: read_u32_le(&raw[PROC_LC_PPID_OFF..]), + uid: read_u32_le(&raw[PROC_LC_UID_OFF..]), + gid: read_u32_le(&raw[PROC_LC_GID_OFF..]), + timestamp: read_u64_le(&raw[PROC_LC_TS_OFF..]), + comm, + }) + } +} + +impl From for metric::ProcLifecycleEventData { + fn from(d: ProcLifecycleEventData) -> Self { + Self { + lifecycle_type: d.lifecycle_type as i32, + pid: d.pid, + parent_pid: d.parent_pid, + uid: d.uid, + gid: d.gid, + timestamp: d.timestamp, + comm: d.comm, + } + } +} + +// ── EventData ────────────────────────────────────────────────────────── enum EventData { OtherEvent, IoEvent(IoEventData), + FileOpEvent(FileOpEventData), + PermOpEvent(PermOpEventData), + ProcLifecycleEvent(ProcLifecycleEventData), } impl Debug for EventData { @@ -116,21 +314,42 @@ impl Debug for EventData { d.latency, d.off_bytes )), + EventData::FileOpEvent(d) => f.write_fmt(format_args!( + "FileOpEventData {{ op_type: {}, pid: {}, filename: {} }}", + d.op_type, + d.pid, + str::from_utf8(&d.filename).unwrap_or("") + )), + EventData::PermOpEvent(d) => f.write_fmt(format_args!( + "PermOpEventData {{ op_type: {}, pid: {}, new_uid: {}, new_gid: {} }}", + d.op_type, d.pid, d.new_uid, d.new_gid + )), + EventData::ProcLifecycleEvent(d) => f.write_fmt(format_args!( + "ProcLifecycleEventData {{ type: {}, pid: {}, parent_pid: {} }}", + d.lifecycle_type, d.pid, d.parent_pid + )), _ => f.write_str("other event"), } } } +// ── EventType ────────────────────────────────────────────────────────── #[derive(PartialEq)] pub enum EventType { OtherEvent = 0, IoEvent = 1, + FileOpEvent = 2, + PermOpEvent = 3, + ProcLifecycleEvent = 4, } impl From for EventType { fn from(source: u8) -> Self { match source { IO_EVENT => Self::IoEvent, + FILE_OP_EVENT => Self::FileOpEvent, + PERM_OP_EVENT => Self::PermOpEvent, + PROC_LIFECYCLE_EVENT => Self::ProcLifecycleEvent, _ => Self::OtherEvent, } } @@ -147,10 +366,14 @@ impl fmt::Display for EventType { match self { Self::OtherEvent => write!(f, "other_event"), Self::IoEvent => write!(f, "io_event"), + Self::FileOpEvent => write!(f, "file_op_event"), + Self::PermOpEvent => write!(f, "perm_op_event"), + Self::ProcLifecycleEvent => write!(f, "proc_lifecycle_event"), } } } +// ── ProcEvent ────────────────────────────────────────────────────────── pub struct ProcEvent { pub pid: u32, pub pod_id: u32, @@ -175,10 +398,25 @@ impl ProcEvent { let mut end_time = 0; match event_type { EventType::IoEvent => { - let io_event_data = IoEventData::try_from(raw_data)?; // Try to parse IoEventData from data.cap_data + let io_event_data = IoEventData::try_from(raw_data)?; end_time = start_time + io_event_data.latency; event_data = EventData::IoEvent(io_event_data); } + EventType::FileOpEvent => { + let d = FileOpEventData::try_from(raw_data)?; + end_time = start_time; + event_data = EventData::FileOpEvent(d); + } + EventType::PermOpEvent => { + let d = PermOpEventData::try_from(raw_data)?; + end_time = start_time; + event_data = EventData::PermOpEvent(d); + } + EventType::ProcLifecycleEvent => { + let d = ProcLifecycleEventData::try_from(raw_data)?; + end_time = start_time; + event_data = EventData::ProcLifecycleEvent(d); + } _ => {} } @@ -230,8 +468,17 @@ impl Sendable for BoxedProcEvents { ..Default::default() }; match self.0.event_data { - EventData::IoEvent(io_event_data) => { - pb_proc_event.io_event_data = Some(io_event_data.into()) + EventData::IoEvent(d) => { + pb_proc_event.io_event_data = Some(d.into()); + } + EventData::FileOpEvent(d) => { + pb_proc_event.file_op_event_data = Some(d.into()); + } + EventData::PermOpEvent(d) => { + pb_proc_event.perm_op_event_data = Some(d.into()); + } + EventData::ProcLifecycleEvent(d) => { + pb_proc_event.proc_lifecycle_event_data = Some(d.into()); } _ => {} } diff --git a/agent/src/config/config.rs b/agent/src/config/config.rs index 6d9d6503fc5..aa633524d68 100644 --- a/agent/src/config/config.rs +++ b/agent/src/config/config.rs @@ -558,6 +558,28 @@ pub struct SymbolTable { pub java: Java, } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq)] +#[serde(default)] +pub struct AiAgentConfig { + pub http_endpoints: Vec, + pub max_payload_size: usize, + pub file_io_enabled: bool, +} + +impl Default for AiAgentConfig { + fn default() -> Self { + Self { + http_endpoints: vec![ + "/v1/chat/completions".to_string(), + "/v1/embeddings".to_string(), + "/v1/responses".to_string(), + ], + max_payload_size: 0, // 0 means unlimited + file_io_enabled: true, + } + } +} + #[derive(Clone, Debug, Deserialize, PartialEq, Eq)] #[serde(default)] pub struct Proc { @@ -572,6 +594,7 @@ pub struct Proc { pub process_blacklist: Vec, pub process_matcher: Vec, pub symbol_table: SymbolTable, + pub ai_agent: AiAgentConfig, } impl Default for Proc { @@ -658,6 +681,7 @@ impl Default for Proc { }, ], symbol_table: SymbolTable::default(), + ai_agent: AiAgentConfig::default(), }; p.process_blacklist.sort_unstable(); p.process_blacklist.dedup(); diff --git a/agent/src/config/handler.rs b/agent/src/config/handler.rs index 4ba453c60ff..a9d7d63acbc 100755 --- a/agent/src/config/handler.rs +++ b/agent/src/config/handler.rs @@ -1150,6 +1150,9 @@ pub struct LogParserConfig { pub unconcerned_dns_nxdomain_trie: DomainNameTrie, pub mysql_decompress_payload: bool, pub custom_app: CustomAppConfig, + pub ai_agent_endpoints: Vec, + pub ai_agent_max_payload_size: usize, + pub ai_agent_file_io_enabled: bool, } impl Default for LogParserConfig { @@ -1169,6 +1172,13 @@ impl Default for LogParserConfig { unconcerned_dns_nxdomain_trie: DomainNameTrie::default(), mysql_decompress_payload: true, custom_app: CustomAppConfig::default(), + ai_agent_endpoints: vec![ + "/v1/chat/completions".to_string(), + "/v1/embeddings".to_string(), + "/v1/responses".to_string(), + ], + ai_agent_max_payload_size: usize::MAX, // default: unlimited (config 0 → usize::MAX) + ai_agent_file_io_enabled: true, } } } @@ -1215,6 +1225,8 @@ impl fmt::Debug for LogParserConfig { ) .field("mysql_decompress_payload", &self.mysql_decompress_payload) .field("custom_app", &self.custom_app) + .field("ai_agent_endpoints", &self.ai_agent_endpoints) + .field("ai_agent_max_payload_size", &self.ai_agent_max_payload_size) .finish() } } @@ -1288,6 +1300,7 @@ pub struct EbpfConfig { pub agent_id: u16, pub epc_id: u32, pub l7_log_packet_size: usize, + pub ai_agent_max_payload_size: usize, // 静态配置 pub l7_protocol_inference_max_fail_count: usize, pub l7_protocol_inference_ttl: usize, @@ -1313,6 +1326,7 @@ impl fmt::Debug for EbpfConfig { .field("agent_id", &self.agent_id) .field("epc_id", &self.epc_id) .field("l7_log_packet_size", &self.l7_log_packet_size) + .field("ai_agent_max_payload_size", &self.ai_agent_max_payload_size) .field( "l7_protocol_inference_max_fail_count", &self.l7_protocol_inference_max_fail_count, @@ -2335,6 +2349,13 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig { None }, }, + ai_agent_endpoints: conf.inputs.proc.ai_agent.http_endpoints.clone(), + ai_agent_max_payload_size: if conf.inputs.proc.ai_agent.max_payload_size == 0 { + usize::MAX // 0 means unlimited + } else { + conf.inputs.proc.ai_agent.max_payload_size + }, + ai_agent_file_io_enabled: conf.inputs.proc.ai_agent.file_io_enabled, }, debug: DebugConfig { agent_id: conf.global.common.agent_id as u16, @@ -2372,6 +2393,7 @@ impl TryFrom<(Config, UserConfig)> for ModuleConfig { epc_id: conf.global.common.vpc_id, l7_log_packet_size: crate::ebpf::CAP_LEN_MAX .min(conf.processors.request_log.tunning.payload_truncation as usize), + ai_agent_max_payload_size: conf.inputs.proc.ai_agent.max_payload_size, l7_log_tap_types: generate_tap_types_array( &conf.outputs.flow_log.filters.l7_capture_network_types, ), @@ -5648,6 +5670,15 @@ impl ConfigHandler { }, ..new_config.log_parser.clone() }; + + // Propagate file_io_enabled toggle to AiAgentRegistry + #[cfg(feature = "enterprise")] + { + if let Some(registry) = enterprise_utils::ai_agent::global_registry() { + registry + .set_file_io_enabled(candidate_config.log_parser.ai_agent_file_io_enabled); + } + } } if candidate_config.synchronizer != new_config.synchronizer { diff --git a/agent/src/ebpf/Makefile b/agent/src/ebpf/Makefile index 687c5a7bb45..d377e8afdcc 100644 --- a/agent/src/ebpf/Makefile +++ b/agent/src/ebpf/Makefile @@ -117,6 +117,7 @@ ifeq ($(GCC_VER_GTE71),1) CFLAGS += -Wformat-truncation=0 endif CFLAGS += $(JAVA_AGENT_MACROS) +CFLAGS += $(EXTRA_CFLAGS) CTLSRC := user/utils.c user/ctrl_tracer.c user/ctrl.c user/log.c CTLBIN := deepflow-ebpfctl diff --git a/agent/src/ebpf/kernel/files_rw.bpf.c b/agent/src/ebpf/kernel/files_rw.bpf.c index c95e16e8324..55df6d89b80 100644 --- a/agent/src/ebpf/kernel/files_rw.bpf.c +++ b/agent/src/ebpf/kernel/files_rw.bpf.c @@ -370,10 +370,21 @@ static __inline int trace_io_event_common(void *ctx, latency = TIME_ROLLBACK_DEFAULT_LATENCY_NS; } +#ifdef EXTENDED_AI_AGENT_FILE_IO + if (is_ai_agent_process(pid_tgid)) { + goto skip_latency_filter; + } +#endif + if (latency < tracer_ctx->io_event_minimal_duration) { return -1; } +#ifdef EXTENDED_AI_AGENT_FILE_IO +skip_latency_filter: + ; /* null statement - labels cannot be followed by declarations in C */ +#endif + struct __io_event_buffer *buffer = io_event_buffer__lookup(&k0); if (!buffer) { return -1; @@ -382,6 +393,12 @@ static __inline int trace_io_event_common(void *ctx, buffer->bytes_count = data_args->bytes_count; buffer->latency = latency; buffer->operation = direction; +#ifdef EXTENDED_AI_AGENT_FILE_IO + buffer->access_permission = + ai_agent_get_access_permission(pid_tgid, data_args->fd, offset); +#else + buffer->access_permission = 0; +#endif struct __socket_data_buffer *v_buff = bpf_map_lookup_elem(&NAME(data_buf), &k0); if (!v_buff) diff --git a/agent/src/ebpf/kernel/include/common.h b/agent/src/ebpf/kernel/include/common.h index 9a89b5e7f80..f3e00f40300 100644 --- a/agent/src/ebpf/kernel/include/common.h +++ b/agent/src/ebpf/kernel/include/common.h @@ -113,6 +113,9 @@ enum process_data_extra_source { DATA_SOURCE_RESERVED, DATA_SOURCE_DPDK, DATA_SOURCE_UNIX_SOCKET, + DATA_SOURCE_FILE_OP_EVENT, + DATA_SOURCE_PERM_OP_EVENT, + DATA_SOURCE_PROC_LIFECYCLE_EVENT, }; struct protocol_message_t { diff --git a/agent/src/ebpf/kernel/include/protocol_inference.h b/agent/src/ebpf/kernel/include/protocol_inference.h index 400f2f04bbf..203cec8796c 100644 --- a/agent/src/ebpf/kernel/include/protocol_inference.h +++ b/agent/src/ebpf/kernel/include/protocol_inference.h @@ -3942,6 +3942,12 @@ static __inline void check_and_set_data_reassembly(struct conn_info_s tracer_ctx_map__lookup(&k0); if (tracer_ctx == NULL) return; + __u32 data_limit_max = tracer_ctx->data_limit_max; +#ifdef EXTENDED_AI_AGENT_FILE_IO + if (conn_info->socket_info_ptr->is_ai_agent) + data_limit_max = + tracer_ctx->ai_agent_data_limit_max; +#endif /* * Here, the length is checked, and if it has already reached * the configured limit, assembly will not proceed. @@ -3956,9 +3962,9 @@ static __inline void check_and_set_data_reassembly(struct conn_info_s * reassembly is needed (whether to decide to push to the upper layer * for reassembly). */ - if (conn_info->socket_info_ptr->reasm_bytes >= - tracer_ctx->data_limit_max - || conn_info->prev_count > 0) + if ((data_limit_max > 0 && + conn_info->socket_info_ptr->reasm_bytes >= + data_limit_max) || conn_info->prev_count > 0) conn_info->enable_reasm = false; } else { conn_info->enable_reasm = false; diff --git a/agent/src/ebpf/kernel/include/socket_trace.h b/agent/src/ebpf/kernel/include/socket_trace.h index 1d1d0ef8b9d..3124c717de5 100644 --- a/agent/src/ebpf/kernel/include/socket_trace.h +++ b/agent/src/ebpf/kernel/include/socket_trace.h @@ -360,6 +360,26 @@ struct sched_comm_exit_ctx { #endif }; +/* sched_process_fork tracepoint context */ +struct sched_comm_fork_ctx { +#ifdef LINUX_VER_RT + __u64 __pad_0; /* 0 8 */ + unsigned char common_migrate_disable; /* 8 1 */ + unsigned char common_preempt_lazy_count; /* 9 1 */ + unsigned short padding; + char parent_comm[16]; /* 12 16 */ + pid_t parent_pid; /* 28 4 */ + char child_comm[16]; /* 32 16 */ + pid_t child_pid; /* 48 4 */ +#else + __u64 __pad_0; /* 0 8 */ + char parent_comm[16]; /* offset:8; size:16 */ + pid_t parent_pid; /* offset:24; size:4 */ + char child_comm[16]; /* offset:28; size:16 */ + pid_t child_pid; /* offset:44; size:4 */ +#endif +}; + struct syscall_sendto_enter_ctx { #ifdef LINUX_VER_RT __u64 __pad_0; /* 0 8 */ @@ -388,25 +408,7 @@ struct syscall_sendto_enter_ctx { #endif }; -struct sched_comm_fork_ctx { -#ifdef LINUX_VER_RT - __u64 __pad_0; /* 0 8 */ - unsigned char common_migrate_disable; /* 8 1 */ - unsigned char common_preempt_lazy_count; /* 9 1 */ - unsigned short padding; - char parent_comm[16]; /* 12 16 */ - - __u32 parent_pid; /* 28 4 */ - char child_comm[16]; /* 32 16 */ - __u32 child_pid; /* 48 4 */ -#else - __u64 __pad_0; /* 0 8 */ - char parent_comm[16]; /* 8 16 */ - __u32 parent_pid; /* 24 4 */ - char child_comm[16]; /* 28 16 */ - __u32 child_pid; /* 44 4 */ -#endif -}; +/* sched_comm_fork_ctx already defined above (near sched_comm_exit_ctx) */ struct sched_comm_exec_ctx { #ifdef LINUX_VER_RT diff --git a/agent/src/ebpf/kernel/include/socket_trace_common.h b/agent/src/ebpf/kernel/include/socket_trace_common.h index 4e4e84a6060..372977959dd 100644 --- a/agent/src/ebpf/kernel/include/socket_trace_common.h +++ b/agent/src/ebpf/kernel/include/socket_trace_common.h @@ -23,6 +23,7 @@ #define DF_BPF_SOCKET_TRACE_COMMON_H #define CAP_DATA_SIZE 1024 // For no-brust send buffer #define BURST_DATA_BUF_SIZE 16384 // For brust send buffer +#define AI_AGENT_DATA_LIMIT_MAX_UNLIMITED 0x7fffffff #include "../config.h" @@ -181,7 +182,8 @@ struct socket_info_s { */ __u16 no_trace:1; __u16 data_source:4; // The source of the stored data, defined in the 'enum process_data_extra_source'. - __u16 unused_bits:7; + __u16 is_ai_agent:1; + __u16 unused_bits:6; __u32 reasm_bytes; // The amount of data bytes that have been reassembled. /* @@ -237,6 +239,7 @@ struct tracer_ctx_s { __u64 coroutine_trace_id; /**< Data forwarding association within the same coroutine */ __u64 thread_trace_id; /**< Data forwarding association within the same process/thread, used for multi-transaction scenarios */ __u32 data_limit_max; /**< Maximum number of data transfers */ + __u32 ai_agent_data_limit_max; /**< AI Agent max reassembly limit (0 = unlimited) */ __u32 go_tracing_timeout; /**< Go tracing timeout */ __u32 io_event_collect_mode; /**< IO event collection mode */ __u64 io_event_minimal_duration; /**< Minimum duration for IO events */ @@ -290,6 +293,9 @@ struct __io_event_buffer { // Mount namespace ID of the file’s mount __u32 mntns_id; + // File access permission bits (inode->i_mode & 0xFFF) + __u16 access_permission; + // filename length __u32 len; @@ -318,6 +324,7 @@ struct user_io_event_buffer { char file_dir[FILE_PATH_SZ]; int mnt_id; __u32 mntns_id; + __u16 access_permission; } __attribute__ ((packed)); // struct ebpf_proc_info -> offsets[] arrays index. diff --git a/agent/src/ebpf/kernel/socket_trace.bpf.c b/agent/src/ebpf/kernel/socket_trace.bpf.c index 291cf8bed06..20a7a1fd540 100644 --- a/agent/src/ebpf/kernel/socket_trace.bpf.c +++ b/agent/src/ebpf/kernel/socket_trace.bpf.c @@ -39,6 +39,25 @@ #define __user +#ifdef EXTENDED_AI_AGENT_FILE_IO +#ifndef AI_AGENT_PROC_FORK +#define AI_AGENT_PROC_FORK 1 +#define AI_AGENT_PROC_EXEC 2 +#define AI_AGENT_PROC_EXIT 3 +#endif + +static __inline int is_ai_agent_process(__u64 pid_tgid); +static __inline int ai_agent_submit_event(void *ctx, __u8 source, + void *event, __u32 event_sz, + __u64 pid_tgid); +static __inline int ai_agent_emit_proc_event(void *ctx, __u8 event_type, + __u32 pid, __u32 parent_pid, + __u64 pid_tgid); +static __inline void ai_agent_cleanup_proc_pid(__u32 tgid); +static __inline int ai_agent_on_fork(void *ctx, __u32 parent_tgid, + __u32 child_tgid); +#endif + /* *INDENT-OFF* */ /*********************************************************** * map definitions @@ -1364,6 +1383,14 @@ __data_submit(struct pt_regs *ctx, struct conn_info_s *conn_info, * so they are saved here. */ int data_max_sz = tracer_ctx->data_limit_max; +#ifdef EXTENDED_AI_AGENT_FILE_IO + if (is_ai_agent_process(((__u64)tgid) << 32)) { + __u32 ai_limit = tracer_ctx->ai_agent_data_limit_max; + data_max_sz = ai_limit == 0 ? + AI_AGENT_DATA_LIMIT_MAX_UNLIMITED : ai_limit; + sk_info->is_ai_agent = 1; + } +#endif struct trace_stats *trace_stats = trace_stats_map__lookup(&k0); if (trace_stats == NULL) @@ -1525,6 +1552,9 @@ __data_submit(struct pt_regs *ctx, struct conn_info_s *conn_info, if (is_socket_info_valid(socket_info_ptr)) { sk_info->uid = socket_info_ptr->uid; sk_info->allow_reassembly = socket_info_ptr->allow_reassembly; +#ifdef EXTENDED_AI_AGENT_FILE_IO + socket_info_ptr->is_ai_agent = sk_info->is_ai_agent; +#endif /* * The kernel syscall interface determines that it is the TLS @@ -2870,6 +2900,13 @@ static __inline void __push_close_event(__u64 pid_tgid, __u64 uid, __u64 seq, if (tracer_ctx == NULL) return; int data_max_sz = tracer_ctx->data_limit_max; +#ifdef EXTENDED_AI_AGENT_FILE_IO + if (is_ai_agent_process(pid_tgid)) { + __u32 ai_limit = tracer_ctx->ai_agent_data_limit_max; + data_max_sz = ai_limit == 0 ? + AI_AGENT_DATA_LIMIT_MAX_UNLIMITED : ai_limit; + } +#endif struct __socket_data_buffer *v_buff = bpf_map_lookup_elem(&NAME(data_buf), &k0); if (!v_buff) diff --git a/agent/src/ebpf/kernel/uprobe_base.bpf.c b/agent/src/ebpf/kernel/uprobe_base.bpf.c index 102c2117dc4..60157454ab0 100644 --- a/agent/src/ebpf/kernel/uprobe_base.bpf.c +++ b/agent/src/ebpf/kernel/uprobe_base.bpf.c @@ -636,6 +636,13 @@ static __inline int do_process_exit(void *ctx) bpf_get_current_comm(data.name, sizeof(data.name)); bpf_perf_event_output(ctx, &NAME(socket_data), BPF_F_CURRENT_CPU, &data, sizeof(data)); +#ifdef EXTENDED_AI_AGENT_FILE_IO + if (is_ai_agent_process(id)) { + ai_agent_emit_proc_event(ctx, AI_AGENT_PROC_EXIT, + pid, 0, id); + ai_agent_cleanup_proc_pid(pid); + } +#endif } bpf_map_delete_elem(&goroutines_map, &id); @@ -747,6 +754,11 @@ static __inline int __process_exec(void *ctx) bpf_perf_event_output(ctx, &NAME(socket_data), BPF_F_CURRENT_CPU, &data, sizeof(data)); } +#ifdef EXTENDED_AI_AGENT_FILE_IO + if (is_ai_agent_process(id)) { + ai_agent_emit_proc_event(ctx, AI_AGENT_PROC_EXEC, pid, 0, id); + } +#endif return 0; } diff --git a/agent/src/ebpf/mod.rs b/agent/src/ebpf/mod.rs index e47bb80995c..a7efbab0420 100644 --- a/agent/src/ebpf/mod.rs +++ b/agent/src/ebpf/mod.rs @@ -157,6 +157,12 @@ pub const DATA_SOURCE_OPENSSL_UPROBE: u8 = 3; #[allow(dead_code)] pub const DATA_SOURCE_IO_EVENT: u8 = 4; #[allow(dead_code)] +pub const DATA_SOURCE_FILE_OP_EVENT: u8 = 9; +#[allow(dead_code)] +pub const DATA_SOURCE_PERM_OP_EVENT: u8 = 10; +#[allow(dead_code)] +pub const DATA_SOURCE_PROC_LIFECYCLE_EVENT: u8 = 11; +#[allow(dead_code)] pub const DATA_SOURCE_GO_HTTP2_DATAFRAME_UPROBE: u8 = 5; #[allow(dead_code)] pub const DATA_SOURCE_UNIX_SOCKET: u8 = 8; @@ -568,6 +574,7 @@ extern "C" { * @return the set maximum buffer size value on success, < 0 on failure. */ pub fn set_data_limit_max(limit_size: c_int) -> c_int; + pub fn set_ai_agent_data_limit_max(limit_size: c_int) -> c_int; pub fn set_go_tracing_timeout(timeout: c_int) -> c_int; pub fn set_io_event_collect_mode(mode: c_int) -> c_int; pub fn set_io_event_minimal_duration(duration: c_ulonglong) -> c_int; @@ -836,6 +843,16 @@ extern "C" { pub fn enable_fentry(); pub fn set_virtual_file_collect(enabled: bool) -> c_int; + // BPF map helpers for u32-key maps (used by AI Agent PID tracking) + pub fn bpf_table_get_map_fd(tracer_name: *const c_char, map_name: *const c_char) -> c_int; + pub fn bpf_table_update_u32_key( + map_fd: c_int, + key: c_uint, + val_buf: *const c_void, + val_size: c_int, + ) -> c_int; + pub fn bpf_table_delete_u32_key(map_fd: c_int, key: c_uint) -> c_int; + cfg_if::cfg_if! { if #[cfg(feature = "extended_observability")] { pub fn enable_offcpu_profiler() -> c_int; diff --git a/agent/src/ebpf/user/mount.c b/agent/src/ebpf/user/mount.c index 9ab27d7d90b..b7b34805a1a 100644 --- a/agent/src/ebpf/user/mount.c +++ b/agent/src/ebpf/user/mount.c @@ -792,6 +792,7 @@ u32 copy_file_metrics(int pid, void *dst, void *src, int len, u_event->file_type = file_type; u_event->mnt_id = event->mnt_id; u_event->mntns_id = event->mntns_id; + u_event->access_permission = event->access_permission; strcpy_s_inline(u_event->mount_source, sizeof(u_event->mount_source), mount_source, strlen(mount_source)); fast_strncat_trunc(mntns_str, mount_point, u_event->mount_point, diff --git a/agent/src/ebpf/user/socket.c b/agent/src/ebpf/user/socket.c index 3ba3d766fb3..fbf759e3904 100644 --- a/agent/src/ebpf/user/socket.c +++ b/agent/src/ebpf/user/socket.c @@ -22,6 +22,7 @@ #include #include #include +#include #include #include #include "clib.h" @@ -117,6 +118,7 @@ static pthread_mutex_t datadump_mutex; * Set by set_data_limit_max() */ static uint32_t socket_data_limit_max; +static uint32_t ai_agent_data_limit_max; static uint32_t go_tracing_timeout = GO_TRACING_TIMEOUT_DEFAULT; @@ -272,6 +274,31 @@ static inline void config_probes_for_proc_event(struct tracer_probes_conf *tps) } } +#ifdef EXTENDED_AI_AGENT_FILE_IO +static inline void config_probes_for_ai_agent(struct tracer_probes_conf *tps) +{ + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_openat"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_unlinkat"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_unlink"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_fchmodat"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_chmod"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_fchownat"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_chown"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_setuid"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_setgid"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_setreuid"); + tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_setregid"); + tps_set_symbol(tps, "tracepoint/sched/sched_process_fork"); + tps_set_symbol(tps, "tracepoint/sched/sched_process_exec"); + tps_set_symbol(tps, "tracepoint/sched/sched_process_exit"); +} +#else +static inline void config_probes_for_ai_agent(struct tracer_probes_conf *tps) +{ + (void)tps; +} +#endif + static void config_probes_for_kfunc(struct tracer_probes_conf *tps) { kfunc_set_sym_for_entry_and_exit(tps, "ksys_write"); @@ -292,6 +319,7 @@ static void config_probes_for_kfunc(struct tracer_probes_conf *tps) kfunc_set_symbol(tps, "__sys_accept4", true); kfunc_set_symbol(tps, "__sys_connect", false); config_probes_for_proc_event(tps); + config_probes_for_ai_agent(tps); /* * On certain kernels, such as 5.15.0-127-generic and 5.10.134-18.al8.x86_64, @@ -402,6 +430,7 @@ static void config_probes_for_kprobe_and_tracepoint(struct tracer_probes_conf tps_set_symbol(tps, "tracepoint/syscalls/sys_enter_pwritev2"); tps_set_symbol(tps, "tracepoint/syscalls/sys_exit_pwritev2"); } + config_probes_for_ai_agent(tps); } static inline void __config_kprobe(struct tracer_probes_conf *tps, @@ -465,6 +494,7 @@ static void config_probes_for_kprobe(struct tracer_probes_conf *tps) probes_set_enter_symbol(tps, "__close_fd"); probes_set_exit_symbol(tps, "__sys_socket"); probes_set_enter_symbol(tps, "__sys_connect"); + config_probes_for_ai_agent(tps); } static void socket_tracer_set_probes(struct tracer_probes_conf *tps) @@ -2295,6 +2325,22 @@ static inline int __set_data_limit_max(int limit_size) return socket_data_limit_max; } +static inline int __set_ai_agent_data_limit_max(unsigned int limit_size) +{ + if (limit_size == 0) { + ai_agent_data_limit_max = 0; + } else if (limit_size > INT_MAX) { + ai_agent_data_limit_max = INT_MAX; + } else { + ai_agent_data_limit_max = limit_size; + } + + ebpf_info("Received ai_agent limit_size (%d), the final value is set to '%u'\n", + limit_size, ai_agent_data_limit_max); + + return ai_agent_data_limit_max; +} + /** * Set maximum amount of data passed to the agent by eBPF programe. * @limit_size : The maximum length of data. If @limit_size exceeds 16384, @@ -2343,6 +2389,44 @@ int set_data_limit_max(int limit_size) return set_val; } +int set_ai_agent_data_limit_max(int limit_size) +{ + int set_val = __set_ai_agent_data_limit_max(limit_size); + if (set_val < 0) + return set_val; + + struct bpf_tracer *tracer = find_bpf_tracer(SK_TRACER_NAME); + if (tracer == NULL) { + /* + * Called before running_socket_tracer(), + * no need to update config map + */ + return set_val; + } + + int cpu; + int nr_cpus = get_num_possible_cpus(); + struct tracer_ctx_s values[nr_cpus]; + memset(values, 0, sizeof(values)); + + if (!bpf_table_get_value(tracer, MAP_TRACER_CTX_NAME, 0, values)) { + ebpf_warning("Get map '%s' failed.\n", MAP_TRACER_CTX_NAME); + return ETR_NOTEXIST; + } + + for (cpu = 0; cpu < nr_cpus; cpu++) { + values[cpu].ai_agent_data_limit_max = set_val; + } + + if (!bpf_table_set_value + (tracer, MAP_TRACER_CTX_NAME, 0, (void *)&values)) { + ebpf_warning("Set '%s' failed\n", MAP_TRACER_CTX_NAME); + return ETR_UPDATE_MAP_FAILD; + } + + return set_val; +} + int set_go_tracing_timeout(int timeout) { go_tracing_timeout = timeout; @@ -3028,6 +3112,8 @@ int running_socket_tracer(tracer_callback_t handle, // Set default maximum amount of data passed to the agent by eBPF. if (socket_data_limit_max == 0) __set_data_limit_max(0); + if (ai_agent_data_limit_max == 0) + __set_ai_agent_data_limit_max(0); uint64_t uid_base = (gettime(CLOCK_REALTIME, TIME_TYPE_NAN) / 100) & 0xffffffffffffffULL; @@ -3042,6 +3128,7 @@ int running_socket_tracer(tracer_callback_t handle, t_conf[cpu].coroutine_trace_id = t_conf[cpu].socket_id; t_conf[cpu].thread_trace_id = t_conf[cpu].socket_id; t_conf[cpu].data_limit_max = socket_data_limit_max; + t_conf[cpu].ai_agent_data_limit_max = ai_agent_data_limit_max; t_conf[cpu].io_event_collect_mode = io_event_collect_mode; t_conf[cpu].io_event_minimal_duration = io_event_minimal_duration; @@ -3056,6 +3143,7 @@ int running_socket_tracer(tracer_callback_t handle, return -EINVAL; ebpf_info("Config socket_data_limit_max: %d\n", socket_data_limit_max); + ebpf_info("Config ai_agent_data_limit_max: %u\n", ai_agent_data_limit_max); ebpf_info("Config io_event_collect_mode: %d\n", io_event_collect_mode); ebpf_info("Config io_event_minimal_duration: %llu ns\n", io_event_minimal_duration); ebpf_info("Config virtual_file_collect_enable: %d\n", virtual_file_collect_enable); diff --git a/agent/src/ebpf/user/socket.h b/agent/src/ebpf/user/socket.h index b09799fce57..fd5e72fcd74 100644 --- a/agent/src/ebpf/user/socket.h +++ b/agent/src/ebpf/user/socket.h @@ -408,6 +408,7 @@ prefetch_and_process_data(struct bpf_tracer *t, int id, int nb_rx, void **datas_ } int set_data_limit_max(int limit_size); +int set_ai_agent_data_limit_max(int limit_size); int set_go_tracing_timeout(int timeout); int set_io_event_collect_mode(uint32_t mode); int set_io_event_minimal_duration(uint64_t duration); diff --git a/agent/src/ebpf/user/table.c b/agent/src/ebpf/user/table.c index 72ec4822222..eb57cf23fe8 100644 --- a/agent/src/ebpf/user/table.c +++ b/agent/src/ebpf/user/table.c @@ -154,3 +154,41 @@ int bpf_table_get_fd(struct bpf_tracer *tracer, const char *tb_name) return map->fd; } + +int bpf_table_get_map_fd(const char *tracer_name, const char *map_name) +{ + struct bpf_tracer *tracer = find_bpf_tracer(tracer_name); + if (tracer == NULL) { + ebpf_warning("[%s] tracer \"%s\" not found.\n", __func__, + tracer_name); + return -1; + } + + return bpf_table_get_fd(tracer, map_name); +} + +int bpf_table_update_u32_key(int map_fd, uint32_t key, void *val_buf, + int val_size) +{ + (void)val_size; + if (bpf_update_elem(map_fd, &key, val_buf, BPF_ANY) != 0) { + ebpf_warning("[%s] bpf_map_update_elem failed, fd: %d, " + "key: %u, err: %s\n", __func__, map_fd, + key, strerror(errno)); + return -1; + } + + return 0; +} + +int bpf_table_delete_u32_key(int map_fd, uint32_t key) +{ + if (bpf_delete_elem(map_fd, &key) != 0) { + ebpf_debug("[%s] bpf_map_delete_elem failed, fd: %d, " + "key: %u, err: %s\n", __func__, map_fd, + key, strerror(errno)); + return -1; + } + + return 0; +} diff --git a/agent/src/ebpf/user/table.h b/agent/src/ebpf/user/table.h index 07e95c10879..15df114fa55 100644 --- a/agent/src/ebpf/user/table.h +++ b/agent/src/ebpf/user/table.h @@ -36,4 +36,9 @@ void insert_prog_to_map(struct bpf_tracer *tracer, const char *map_name, const char *prog_name, int key); int bpf_table_get_fd(struct bpf_tracer *tracer, const char *tb_name); + +int bpf_table_get_map_fd(const char *tracer_name, const char *map_name); +int bpf_table_update_u32_key(int map_fd, uint32_t key, void *val_buf, + int val_size); +int bpf_table_delete_u32_key(int map_fd, uint32_t key); #endif /* DF_BPF_TABLE_H */ diff --git a/agent/src/ebpf_dispatcher.rs b/agent/src/ebpf_dispatcher.rs index 8c471fd1c84..330761dc285 100644 --- a/agent/src/ebpf_dispatcher.rs +++ b/agent/src/ebpf_dispatcher.rs @@ -1064,7 +1064,7 @@ impl EbpfCollector { return Err(Error::EbpfRunningError); } - Self::ebpf_on_config_change(config.l7_log_packet_size); + Self::ebpf_on_config_change(config.l7_log_packet_size, config.ai_agent_max_payload_size); let ebpf_conf = &config.ebpf; let on_cpu = &ebpf_conf.profile.on_cpu; @@ -1279,10 +1279,26 @@ impl EbpfCollector { ebpf::bpf_tracer_finish(); + // Wire AI Agent PID → BPF map fd after all tracers are loaded + #[cfg(feature = "enterprise")] + { + use enterprise_utils::ai_agent::global_registry; + let fd = unsafe { + ebpf::bpf_table_get_map_fd(c"socket-trace".as_ptr(), c"__ai_agent_pids".as_ptr()) + }; + if fd >= 0 { + if let Some(registry) = global_registry() { + registry.set_bpf_map_fd(fd); + } + } else { + warn!("AI Agent: could not find __ai_agent_pids BPF map (fd={}), file I/O monitoring will not work", fd); + } + } + Ok(handle) } - fn ebpf_on_config_change(l7_log_packet_size: usize) { + fn ebpf_on_config_change(l7_log_packet_size: usize, ai_agent_max_payload_size: usize) { unsafe { let n = ebpf::set_data_limit_max(l7_log_packet_size as c_int); if n < 0 { @@ -1296,6 +1312,24 @@ impl EbpfCollector { l7_log_packet_size, n ); } + + let ai_agent_limit = if ai_agent_max_payload_size == 0 { + 0 + } else { + ai_agent_max_payload_size.min(i32::MAX as usize) as c_int + }; + let n = ebpf::set_ai_agent_data_limit_max(ai_agent_limit); + if n < 0 { + warn!( + "ebpf set ai_agent_max_payload_size({}) failed.", + ai_agent_max_payload_size + ); + } else if ai_agent_limit != 0 && n != ai_agent_limit { + info!( + "ebpf set ai_agent_max_payload_size to {}, actual effective configuration is {}.", + ai_agent_max_payload_size, n + ); + } } } @@ -1504,7 +1538,10 @@ impl EbpfCollector { } } - Self::ebpf_on_config_change(config.l7_log_packet_size); + Self::ebpf_on_config_change( + config.l7_log_packet_size, + config.ai_agent_max_payload_size, + ); #[cfg(feature = "extended_observability")] { diff --git a/agent/src/flow_generator/perf/mod.rs b/agent/src/flow_generator/perf/mod.rs index c9bcad51c03..6ff9ba828aa 100644 --- a/agent/src/flow_generator/perf/mod.rs +++ b/agent/src/flow_generator/perf/mod.rs @@ -41,6 +41,8 @@ use super::{ protocol_logs::AppProtoHead, }; +#[cfg(feature = "enterprise")] +use crate::common::flow::BIZ_TYPE_AI_AGENT; use crate::common::l7_protocol_log::L7PerfCache; use crate::common::{ flow::{Flow, L7PerfStats}, @@ -237,6 +239,12 @@ pub struct FlowLog { ntp_diff: Arc, obfuscate_cache: Option, + + // Enterprise: set to true when AI Agent traffic is detected (biz_type == BIZ_TYPE_AI_AGENT). + // When true, subsequent packets use ai_agent_max_payload_size instead of l7_log_packet_size + // to preserve full LLM request/response bodies for audit. + #[cfg(feature = "enterprise")] + is_ai_agent: bool, } impl FlowLog { @@ -272,6 +280,17 @@ impl FlowLog { remote_epc: i32, ) -> Result { if let Some(payload) = packet.get_l7() { + // Enterprise: AI Agent flows use a larger payload size to preserve full + // LLM request/response bodies for governance audit. + #[cfg(feature = "enterprise")] + let pkt_size = if self.is_ai_agent { + log_parser_config.ai_agent_max_payload_size + } else { + flow_config.l7_log_packet_size as usize + }; + #[cfg(not(feature = "enterprise"))] + let pkt_size = flow_config.l7_log_packet_size as usize; + let mut parse_param = ParseParam::new( &*packet, Some(self.perf_cache.clone()), @@ -285,7 +304,7 @@ impl FlowLog { #[cfg(any(target_os = "linux", target_os = "android"))] parse_param.set_counter(self.stats_counter.clone()); parse_param.set_rrt_timeout(self.rrt_timeout); - parse_param.set_buf_size(flow_config.l7_log_packet_size as usize); + parse_param.set_buf_size(pkt_size); parse_param.set_captured_byte(packet.get_captured_byte()); parse_param.set_oracle_conf(flow_config.oracle_parse_conf); parse_param.set_iso8583_conf(&flow_config.iso8583_parse_conf); @@ -304,7 +323,6 @@ impl FlowLog { let ret = parser.parse_payload( { - let pkt_size = flow_config.l7_log_packet_size as usize; if pkt_size > payload.len() { payload } else { @@ -314,6 +332,17 @@ impl FlowLog { &parse_param, ); + // Enterprise: detect AI Agent traffic from parsed result and set the flag + // so subsequent packets in this flow use the larger payload size. + #[cfg(feature = "enterprise")] + if !self.is_ai_agent { + if let Ok(ref result) = ret { + if result.has_biz_type(BIZ_TYPE_AI_AGENT) { + self.is_ai_agent = true; + } + } + } + let mut cache_proto = |proto: L7ProtocolEnum| match packet.signal_source { SignalSource::EBPF => { app_table.set_protocol_from_ebpf(packet, proto, local_epc, remote_epc) @@ -589,6 +618,8 @@ impl FlowLog { l7_protocol_inference_ttl, ntp_diff, obfuscate_cache, + #[cfg(feature = "enterprise")] + is_ai_agent: false, }) } diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index 789ee9679f0..6c80dbfb8a0 100644 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -72,6 +72,9 @@ if #[cfg(feature = "enterprise")] { use public::l7_protocol::NativeTag; use crate::flow_generator::protocol_logs::{auto_merge_custom_field, CUSTOM_FIELD_POLICY_PRIORITY}; + + use enterprise_utils::ai_agent::match_ai_agent_endpoint; + use crate::common::flow::BIZ_TYPE_AI_AGENT; } } @@ -307,6 +310,9 @@ pub struct HttpInfo { pub grpc_status_code: Option, endpoint: Option, + // set when AI Agent URL is detected (enterprise only) + #[serde(skip)] + protocol_str: Option, // set by wasm plugin #[l7_log(response_result)] custom_result: Option, @@ -913,6 +919,7 @@ impl From for L7ProtocolSendLog { user_agent: f.user_agent, referer: f.referer, rpc_service: f.service_name, + protocol_str: f.protocol_str, attributes: { if f.attributes.is_empty() { None @@ -1248,11 +1255,36 @@ impl HttpLog { info.service_name = info.grpc_package_service_name(); if !config.http_endpoint_disabled && info.path.len() > 0 { // Priority use of info.endpoint, because info.endpoint may be set by the wasm plugin - let path = match info.endpoint.as_ref() { - Some(p) if !p.is_empty() => p, - _ => &info.path, + let _endpoint_already_set = matches!(info.endpoint.as_ref(), Some(p) if !p.is_empty()); + let path_owned = if let Some(p) = info.endpoint.as_ref().filter(|p| !p.is_empty()) { + p.clone() + } else { + info.path.clone() }; - info.endpoint = Some(handle_endpoint(config, path)); + // Priority chain: WASM/biz_field > AI Agent detection > http_endpoint Trie + #[cfg(feature = "enterprise")] + let ai_agent_matched = if !_endpoint_already_set { + if let Some(matched_path) = match_ai_agent_endpoint( + &config.ai_agent_endpoints, + path_owned.as_str(), + param.process_id, + std::time::Duration::from_micros(param.time), + ) { + info.endpoint = Some(matched_path); + info.biz_type = BIZ_TYPE_AI_AGENT; + info.protocol_str = Some("LLM".to_string()); + true + } else { + false + } + } else { + false + }; + #[cfg(not(feature = "enterprise"))] + let ai_agent_matched = false; + if !ai_agent_matched { + info.endpoint = Some(handle_endpoint(config, &path_owned)); + } } let l7_dynamic_config = &config.l7_log_dynamic; @@ -2809,6 +2841,7 @@ mod tests { iso8583_parse_conf: Iso8583ParseConfig::default(), web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(), icmp_data: None, + process_id: 0, }; //测试长度不正确 diff --git a/agent/src/platform/platform_synchronizer/linux_process.rs b/agent/src/platform/platform_synchronizer/linux_process.rs index 28b26162f41..5a5865ceaed 100644 --- a/agent/src/platform/platform_synchronizer/linux_process.rs +++ b/agent/src/platform/platform_synchronizer/linux_process.rs @@ -63,6 +63,8 @@ pub struct ProcessData { pub netns_id: u32, // pod container id in kubernetes pub container_id: String, + // business type, e.g. BIZ_TYPE_AI_AGENT = 1 + pub biz_type: u8, } impl ProcessDataOp for Vec { @@ -195,6 +197,7 @@ impl TryFrom<&Process> for ProcessData { os_app_tags: vec![], netns_id: get_proc_netns(proc).unwrap_or_default() as u32, container_id: get_container_id(proc).unwrap_or("".to_string()), + biz_type: 0, }) } } @@ -221,6 +224,7 @@ impl From<&ProcessData> for ProcessInfo { }, netns_id: Some(p.netns_id), container_id: Some(p.container_id.clone()), + biz_type: Some(p.biz_type as u32), } } } @@ -364,7 +368,7 @@ pub(crate) fn get_all_process_in(conf: &OsProcScanConfig, ret: &mut Vec ProcResult { @@ -565,6 +569,7 @@ mod test { }], netns_id: 1, container_id: "".into(), + biz_type: 0, }, ProcessData { name: "parent".into(), @@ -582,6 +587,7 @@ mod test { }], netns_id: 1, container_id: "".into(), + biz_type: 0, }, ProcessData { name: "child".into(), @@ -599,6 +605,7 @@ mod test { }], netns_id: 1, container_id: "".into(), + biz_type: 0, }, ProcessData { name: "other".into(), @@ -616,6 +623,7 @@ mod test { }], netns_id: 1, container_id: "".into(), + biz_type: 0, }, ]; diff --git a/agent/src/platform/platform_synchronizer/proc_scan_hook.rs b/agent/src/platform/platform_synchronizer/proc_scan_hook.rs index 05b9493681c..3a150496a29 100644 --- a/agent/src/platform/platform_synchronizer/proc_scan_hook.rs +++ b/agent/src/platform/platform_synchronizer/proc_scan_hook.rs @@ -16,20 +16,47 @@ use super::ProcessData; -pub fn proc_scan_hook(_: &mut Vec) { - // the hook logic here +pub fn proc_scan_hook(_proc_root: &str, _process_datas: &mut Vec) { + // Enterprise: clean dead AI Agent PIDs and mark alive ones with biz_type + #[cfg(feature = "enterprise")] + { + use std::collections::HashSet; - /* - use super::get_self_proc; + if let Some(registry) = enterprise_utils::ai_agent::global_registry() { + // Use a full /proc scan for cleanup to avoid filtering out short-lived processes + // that are not yet eligible for os_proc_socket_min_lifetime. + let alive_pids: Vec = match procfs::process::all_processes_with_root(_proc_root) { + Ok(procs) => procs + .into_iter() + .filter_map(|p| p.ok()) + .map(|p| p.pid as u32) + .collect(), + Err(_) => _process_datas.iter().map(|pd| pd.pid as u32).collect(), + }; + registry.cleanup_dead_pids(&alive_pids); - let Ok(self_proc) = get_self_proc() else{ - return - }; + for pd in _process_datas.iter_mut() { + if registry.is_ai_agent(pd.pid as u32) { + pd.biz_type = crate::common::flow::BIZ_TYPE_AI_AGENT; + } + } - info!("self proc: {:#?}",self_proc); - - for i in _.iter_mut() { - // handle every process + // Inject AI agent processes that weren't matched by process_matcher. + // Without this, identified AI agents appear in l7_flow_log but NOT in the + // MySQL process table because process_matcher only matches on socket/regex. + let existing_pids: HashSet = + _process_datas.iter().map(|pd| pd.pid as u32).collect(); + for pid in registry.get_all_pids() { + if existing_pids.contains(&pid) { + continue; + } + if let Ok(proc) = procfs::process::Process::new(pid as i32) { + if let Ok(mut pd) = ProcessData::try_from(&proc) { + pd.biz_type = crate::common::flow::BIZ_TYPE_AI_AGENT; + _process_datas.push(pd); + } + } + } } - */ + } } diff --git a/agent/src/plugin/shared_obj/test.rs b/agent/src/plugin/shared_obj/test.rs index 79c47ba739e..adf1a19ad52 100644 --- a/agent/src/plugin/shared_obj/test.rs +++ b/agent/src/plugin/shared_obj/test.rs @@ -92,6 +92,7 @@ fn get_req_param<'a>( iso8583_parse_conf: Iso8583ParseConfig::default(), web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(), icmp_data: None, + process_id: 0, } } @@ -132,6 +133,7 @@ fn get_resp_param<'a>( iso8583_parse_conf: Iso8583ParseConfig::default(), web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(), icmp_data: None, + process_id: 0, } } diff --git a/agent/src/plugin/wasm/test.rs b/agent/src/plugin/wasm/test.rs index 35e8b659c19..72b2c5a1c8d 100644 --- a/agent/src/plugin/wasm/test.rs +++ b/agent/src/plugin/wasm/test.rs @@ -84,6 +84,7 @@ fn get_req_param<'a>( iso8583_parse_conf: Iso8583ParseConfig::default(), web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(), icmp_data: None, + process_id: 0, } } @@ -125,6 +126,7 @@ fn get_resq_param<'a>( iso8583_parse_conf: Iso8583ParseConfig::default(), web_sphere_mq_parse_conf: WebSphereMqParseConfig::default(), icmp_data: None, + process_id: 0, } } diff --git a/agent/src/trident.rs b/agent/src/trident.rs index 99bb35f6401..8ffc1665d25 100644 --- a/agent/src/trident.rs +++ b/agent/src/trident.rs @@ -920,6 +920,12 @@ impl Trident { #[cfg(feature = "enterprise")] Trident::kernel_version_check(&state, &exception_handler); + #[cfg(feature = "enterprise")] + { + let _ai_agent_registry = enterprise_utils::ai_agent::init_global_registry(); + info!("AI Agent governance registry initialized"); + } + let mut components: Option = None; let mut first_run = true; let mut config_initialized = false; diff --git a/agent/src/utils/process/linux.rs b/agent/src/utils/process/linux.rs index 1778ceb89b1..2c13c3fd9ad 100644 --- a/agent/src/utils/process/linux.rs +++ b/agent/src/utils/process/linux.rs @@ -34,6 +34,7 @@ use log::{debug, error, info, trace}; use nix::sys::utsname::uname; use procfs::process::all_processes_with_root; +use crate::common::flow::BIZ_TYPE_AI_AGENT; use crate::config::ProcessMatcher; use crate::platform::{get_os_app_tag_by_exec, ProcessData, ProcessDataOp}; @@ -476,9 +477,14 @@ impl ProcessListener { process_data_cache.retain(|pid, _| alive_pids.contains(pid)); for (key, value) in features.iter_mut() { - if (value.process_matcher.is_empty() && value.pids.is_empty()) - || value.callback.is_none() - { + let ai_agent_pids = fetch_ai_agent_pids(key.as_str()); + + if should_skip_feature( + value.process_matcher.is_empty(), + value.pids.is_empty(), + ai_agent_pids.is_empty(), + value.callback.is_none(), + ) { continue; } @@ -502,6 +508,15 @@ impl ProcessListener { } } + if !ai_agent_pids.is_empty() { + merge_ai_agent_processes( + process_data_cache, + &ai_agent_pids, + &mut pids, + &mut process_datas, + ); + } + pids.sort(); pids.dedup(); process_datas.sort_by_key(|x| x.pid); @@ -563,3 +578,135 @@ impl ProcessListener { self.thread_handle.lock().unwrap().take() } } + +fn should_skip_feature( + process_matcher_empty: bool, + previous_pids_empty: bool, + ai_agent_pids_empty: bool, + callback_missing: bool, +) -> bool { + if callback_missing { + return true; + } + !(!process_matcher_empty || !previous_pids_empty || !ai_agent_pids_empty) +} + +#[cfg(feature = "enterprise")] +fn fetch_ai_agent_pids(feature: &str) -> Vec { + if feature == "proc.gprocess_info" { + if let Some(registry) = enterprise_utils::ai_agent::global_registry() { + return registry.get_all_pids(); + } + } + Vec::new() +} + +#[cfg(not(feature = "enterprise"))] +fn fetch_ai_agent_pids(_feature: &str) -> Vec { + Vec::new() +} + +fn merge_ai_agent_processes( + process_data_cache: &HashMap, + ai_agent_pids: &[u32], + pids: &mut Vec, + process_datas: &mut Vec, +) { + let mut existing_pids: HashSet = pids.iter().copied().collect(); + let ai_agent_set: HashSet = ai_agent_pids.iter().copied().collect(); + for process_data in process_datas.iter_mut() { + if ai_agent_set.contains(&(process_data.pid as u32)) { + process_data.biz_type = BIZ_TYPE_AI_AGENT; + } + } + for pid in ai_agent_pids { + if existing_pids.contains(pid) { + continue; + } + if let Some(process_data) = process_data_cache.get(&(*pid as i32)) { + let mut process_data = process_data.clone(); + process_data.biz_type = BIZ_TYPE_AI_AGENT; + pids.push(*pid); + process_datas.push(process_data); + existing_pids.insert(*pid); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::time::Duration; + + fn make_process_data(pid: u64) -> ProcessData { + ProcessData { + name: format!("proc-{pid}"), + pid, + ppid: 1, + process_name: format!("proc-{pid}"), + cmd: format!("/proc/{pid}"), + cmd_with_args: vec![format!("/proc/{pid}")], + user_id: 0, + user: "root".to_string(), + start_time: Duration::from_secs(0), + os_app_tags: vec![], + netns_id: 0, + container_id: String::new(), + biz_type: 0, + } + } + + #[test] + fn merge_ai_agent_processes_adds_missing_pids() { + let mut process_data_cache = HashMap::new(); + process_data_cache.insert(1001, make_process_data(1001)); + process_data_cache.insert(1002, make_process_data(1002)); + + let ai_agent_pids = vec![1002]; + let mut pids = vec![1001]; + let mut process_datas = vec![make_process_data(1001)]; + + merge_ai_agent_processes( + &process_data_cache, + &ai_agent_pids, + &mut pids, + &mut process_datas, + ); + + pids.sort(); + assert_eq!(pids, vec![1001, 1002]); + assert!(process_datas.iter().any(|pd| pd.pid == 1002)); + } + + #[test] + fn merge_ai_agent_processes_sets_biz_type() { + use crate::common::flow::BIZ_TYPE_AI_AGENT; + + let mut process_data_cache = HashMap::new(); + process_data_cache.insert(2001, make_process_data(2001)); + process_data_cache.insert(2002, make_process_data(2002)); + + let ai_agent_pids = vec![2002]; + let mut pids = vec![2002]; + let mut process_datas = vec![make_process_data(2002)]; + + merge_ai_agent_processes( + &process_data_cache, + &ai_agent_pids, + &mut pids, + &mut process_datas, + ); + + let ai_agent = process_datas + .iter() + .find(|pd| pd.pid == 2002) + .expect("ai agent process missing"); + assert_eq!(ai_agent.biz_type, BIZ_TYPE_AI_AGENT); + } + + #[test] + fn should_skip_feature_allows_ai_agent_without_matcher() { + let skip = should_skip_feature(true, true, false, false); + assert!(!skip); + } +} diff --git a/message/agent.proto b/message/agent.proto index 37d7e44c453..38c8863a7db 100644 --- a/message/agent.proto +++ b/message/agent.proto @@ -550,6 +550,7 @@ message ProcessInfo { optional uint32 netns_id = 7 [default = 0]; optional string container_id = 8 [default = ""]; repeated Tag os_app_tags = 11; + optional uint32 biz_type = 12; } message GenesisProcessData { diff --git a/message/controller.proto b/message/controller.proto index 217db023e37..68eabe9f4ac 100644 --- a/message/controller.proto +++ b/message/controller.proto @@ -136,6 +136,7 @@ message GenesisSyncProcess { optional string start_time = 10; optional uint32 netns_id = 11; optional string container_id = 12; + optional uint32 biz_type = 13; } message GenesisSyncData{ diff --git a/message/metric.proto b/message/metric.proto index 1c5754beae6..e6b4e468348 100644 --- a/message/metric.proto +++ b/message/metric.proto @@ -257,11 +257,68 @@ message IoEventData { bytes mount_point= 7; bytes file_dir = 8; FileType file_type = 9; + uint32 access_permission = 10; // File permission bits (inode->i_mode & 0xFFF) } enum EventType { OtherEvent = 0; IoEvent = 1; + FileOpEvent = 2; // File creation/deletion/chmod/chown + PermOpEvent = 3; // setuid/setgid/setreuid/setregid + ProcLifecycleEvent = 4; // fork/exec/exit +} + +enum FileOpType { + FileOpUnknown = 0; + FileOpCreate = 1; + FileOpDelete = 2; + FileOpChmod = 3; + FileOpChown = 4; +} + +message FileOpEventData { + FileOpType op_type = 1; + uint32 pid = 2; + uint32 uid = 3; + uint32 gid = 4; + uint32 mode = 5; + uint64 timestamp = 6; + bytes filename = 7; +} + +enum PermOpType { + PermOpUnknown = 0; + PermOpSetuid = 1; + PermOpSetgid = 2; + PermOpSetreuid = 3; + PermOpSetregid = 4; +} + +message PermOpEventData { + PermOpType op_type = 1; + uint32 pid = 2; + uint32 old_uid = 3; + uint32 old_gid = 4; + uint32 new_uid = 5; + uint32 new_gid = 6; + uint64 timestamp = 7; +} + +enum ProcLifecycleType { + ProcLifecycleUnknown = 0; + ProcLifecycleFork = 1; + ProcLifecycleExec = 2; + ProcLifecycleExit = 3; +} + +message ProcLifecycleEventData { + ProcLifecycleType lifecycle_type = 1; + uint32 pid = 2; + uint32 parent_pid = 3; + uint32 uid = 4; + uint32 gid = 5; + uint64 timestamp = 6; + bytes comm = 7; } message ProcEvent { @@ -275,6 +332,9 @@ message ProcEvent { IoEventData io_event_data = 8; // Deprecated in v6.4.1: uint32 netns_id = 9; uint32 pod_id = 10; + FileOpEventData file_op_event_data = 11; + PermOpEventData perm_op_event_data = 12; + ProcLifecycleEventData proc_lifecycle_event_data = 13; } message PrometheusMetric { diff --git a/server/agent_config/template.yaml b/server/agent_config/template.yaml index c1c4aae0f0e..76b4da85189 100644 --- a/server/agent_config/template.yaml +++ b/server/agent_config/template.yaml @@ -1666,6 +1666,60 @@ inputs: enabled_features: [proc.gprocess_info] # type: section # name: + # en: AI Agent + # ch: 智能体治理 + # description: + ai_agent: + # type: string + # name: + # en: HTTP Endpoints + # ch: HTTP 端点 + # unit: + # range: [] + # enum_options: [] + # modification: hot_update + # ee_feature: true + # description: + # en: |- + # HTTP endpoints for AI agent recognition. Requests that match any prefix will mark the process as AI Agent. + # ch: |- + # 用于识别智能体的 HTTP 端点前缀,命中后会标记进程为 AI Agent。 + http_endpoints: + - /v1/chat/completions + - /v1/embeddings + - /v1/responses + # type: int + # name: + # en: Max Payload Size + # ch: 最大载荷大小 + # unit: byte + # range: [0, 2147483647] + # enum_options: [] + # modification: hot_update + # ee_feature: true + # description: + # en: |- + # Maximum payload size for AI agent reassembly. 0 means unlimited. + # ch: |- + # AI Agent 流重组最大载荷大小,0 表示不限。 + max_payload_size: 0 + # type: bool + # name: + # en: File IO Enabled + # ch: 文件 IO 事件 + # unit: + # range: [] + # enum_options: [] + # modification: hot_update + # ee_feature: true + # description: + # en: |- + # Whether to enable AI Agent file IO event collection. + # ch: |- + # 是否开启 AI Agent 文件 IO 事件采集。 + file_io_enabled: true + # type: section + # name: # en: Symbol Table # ch: 符号表 # description: diff --git a/server/controller/cloud/cloud.go b/server/controller/cloud/cloud.go index df42d04ee56..bc40f14186c 100644 --- a/server/controller/cloud/cloud.go +++ b/server/controller/cloud/cloud.go @@ -244,6 +244,10 @@ func (c *Cloud) GetResource() model.Resource { cResource = c.getKubernetesData() } if !cResource.Verified { + if c.basicInfo.Type == common.KUBERNETES { + cResource = c.appendResourceProcess(cResource) + return cResource + } return model.Resource{ ErrorState: cResource.ErrorState, ErrorMessage: cResource.ErrorMessage, @@ -693,18 +697,22 @@ func (c *Cloud) appendResourceProcess(resource model.Resource) model.Resource { PID: sProcess.PID, NetnsID: sProcess.NetnsID, ProcessName: processName, + BizType: sProcess.BizType, CommandLine: sProcess.CMDLine, UserName: sProcess.UserName, ContainerID: sProcess.ContainerID, StartTime: sProcess.StartTime, OSAPPTags: sProcess.OSAPPTags, } - if resource.Verified && lcuuid == "" { + if (resource.Verified || c.basicInfo.Type == common.KUBERNETES) && lcuuid == "" { resource.Processes = append(resource.Processes, process) continue } subDomainResource, ok := resource.SubDomainResources[lcuuid] if !ok || !subDomainResource.Verified { + if c.basicInfo.Type == common.KUBERNETES { + resource.Processes = append(resource.Processes, process) + } continue } process.SubDomainLcuuid = lcuuid diff --git a/server/controller/cloud/model/model.go b/server/controller/cloud/model/model.go index bdf307d4367..0eda6e3e9a7 100644 --- a/server/controller/cloud/model/model.go +++ b/server/controller/cloud/model/model.go @@ -680,6 +680,7 @@ type Process struct { VTapID uint32 `json:"vtap_id" binding:"required"` PID uint64 `json:"pid" binding:"required"` ProcessName string `json:"process_name" binding:"required"` + BizType int `json:"biz_type"` CommandLine string `json:"command_line"` UserName string `json:"user_name"` StartTime time.Time `json:"start_time" binding:"required"` diff --git a/server/controller/db/metadb/migrator/schema/const.go b/server/controller/db/metadb/migrator/schema/const.go index 4e0e57afb2e..13815e41a29 100644 --- a/server/controller/db/metadb/migrator/schema/const.go +++ b/server/controller/db/metadb/migrator/schema/const.go @@ -20,5 +20,5 @@ const ( RAW_SQL_ROOT_DIR = "/etc/metadb/schema/rawsql" DB_VERSION_TABLE = "db_version" - DB_VERSION_EXPECTED = "7.1.0.33" + DB_VERSION_EXPECTED = "7.1.0.35" ) diff --git a/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql b/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql index ffb6aacfd47..9bec663a20c 100644 --- a/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql +++ b/server/controller/db/metadb/migrator/schema/rawsql/mysql/ddl_create_table.sql @@ -1256,6 +1256,7 @@ CREATE TABLE IF NOT EXISTS process ( vm_id INTEGER, epc_id INTEGER, process_name TEXT, + biz_type INTEGER DEFAULT 0, command_line TEXT, user_name VARCHAR(256) DEFAULT '', start_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, @@ -1492,6 +1493,7 @@ CREATE TABLE IF NOT EXISTS genesis_process ( lcuuid CHAR(64) DEFAULT '', name TEXT, process_name TEXT, + biz_type INTEGER DEFAULT 0, cmd_line TEXT, user_name VARCHAR(256) DEFAULT '', container_id CHAR(64) DEFAULT '', @@ -1759,6 +1761,7 @@ CREATE TABLE IF NOT EXISTS ch_gprocess ( icon_id INTEGER, chost_id INTEGER, l3_epc_id INTEGER, + biz_type INTEGER, team_id INTEGER, domain_id INTEGER, sub_domain_id INTEGER, diff --git a/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.34.sql b/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.34.sql new file mode 100644 index 00000000000..a39082f9a1a --- /dev/null +++ b/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.34.sql @@ -0,0 +1,32 @@ +DROP PROCEDURE IF EXISTS AddColumnIfNotExists; + +CREATE PROCEDURE AddColumnIfNotExists( + IN tableName VARCHAR(255), + IN colName VARCHAR(255), + IN colType VARCHAR(255), + IN afterCol VARCHAR(255) +) +BEGIN + DECLARE column_count INT; + + SELECT COUNT(*) + INTO column_count + FROM information_schema.columns + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = tableName + AND column_name = colName; + + IF column_count = 0 THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', colName, ' ', colType, ' AFTER ', afterCol); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL AddColumnIfNotExists('process', 'biz_type', 'INTEGER DEFAULT 0', 'process_name'); +CALL AddColumnIfNotExists('genesis_process', 'biz_type', 'INTEGER DEFAULT 0', 'process_name'); + +DROP PROCEDURE AddColumnIfNotExists; + +UPDATE db_version SET version='7.1.0.34'; diff --git a/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.35.sql b/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.35.sql new file mode 100644 index 00000000000..9f9aea5a6dc --- /dev/null +++ b/server/controller/db/metadb/migrator/schema/rawsql/mysql/issu/7.1.0.35.sql @@ -0,0 +1,31 @@ +DROP PROCEDURE IF EXISTS AddColumnIfNotExists; + +CREATE PROCEDURE AddColumnIfNotExists( + IN tableName VARCHAR(255), + IN colName VARCHAR(255), + IN colType VARCHAR(255), + IN afterCol VARCHAR(255) +) +BEGIN + DECLARE column_count INT; + + SELECT COUNT(*) + INTO column_count + FROM information_schema.columns + WHERE TABLE_SCHEMA = DATABASE() + AND TABLE_NAME = tableName + AND column_name = colName; + + IF column_count = 0 THEN + SET @sql = CONCAT('ALTER TABLE ', tableName, ' ADD COLUMN ', colName, ' ', colType, ' AFTER ', afterCol); + PREPARE stmt FROM @sql; + EXECUTE stmt; + DEALLOCATE PREPARE stmt; + END IF; +END; + +CALL AddColumnIfNotExists('ch_gprocess', 'biz_type', 'INTEGER DEFAULT 0', 'l3_epc_id'); + +DROP PROCEDURE AddColumnIfNotExists; + +UPDATE db_version SET version='7.1.0.35'; diff --git a/server/controller/db/metadb/migrator/schema/rawsql/postgres/ddl_create_table.sql b/server/controller/db/metadb/migrator/schema/rawsql/postgres/ddl_create_table.sql index f0965b29919..f18ea0ba762 100644 --- a/server/controller/db/metadb/migrator/schema/rawsql/postgres/ddl_create_table.sql +++ b/server/controller/db/metadb/migrator/schema/rawsql/postgres/ddl_create_table.sql @@ -1369,6 +1369,7 @@ CREATE TABLE IF NOT EXISTS process ( vm_id INTEGER, epc_id INTEGER, process_name TEXT, + biz_type INTEGER DEFAULT 0, command_line TEXT, user_name VARCHAR(256) DEFAULT '', start_time TIMESTAMP NOT NULL DEFAULT NOW(), @@ -1615,6 +1616,7 @@ CREATE TABLE IF NOT EXISTS genesis_process ( lcuuid VARCHAR(64) DEFAULT '', name TEXT, process_name TEXT, + biz_type INTEGER DEFAULT 0, cmd_line TEXT, user_name VARCHAR(256) DEFAULT '', container_id VARCHAR(64) DEFAULT '', @@ -1922,6 +1924,7 @@ CREATE TABLE IF NOT EXISTS ch_gprocess ( icon_id INTEGER, chost_id INTEGER, l3_epc_id INTEGER, + biz_type INTEGER, team_id INTEGER, domain_id INTEGER, sub_domain_id INTEGER, diff --git a/server/controller/db/metadb/model/ch_model.go b/server/controller/db/metadb/model/ch_model.go index b05b5cb9ee5..020d2e059ac 100644 --- a/server/controller/db/metadb/model/ch_model.go +++ b/server/controller/db/metadb/model/ch_model.go @@ -469,6 +469,7 @@ type ChGProcess struct { IconID int `gorm:"column:icon_id;type:int;default:null" json:"ICON_ID"` CHostID int `gorm:"column:chost_id;type:int;not null" json:"CHOST_ID"` L3EPCID int `gorm:"column:l3_epc_id;type:int" json:"L3_EPC_ID"` + BizType int `gorm:"column:biz_type;type:int;default:0" json:"BIZ_TYPE"` TeamID int `gorm:"column:team_id;type:int;not null" json:"TEAM_ID"` DomainID int `gorm:"column:domain_id;type:int;not null" json:"DOMAIN_ID"` SubDomainID int `gorm:"column:sub_domain_id;type:int;default:0" json:"SUB_DOMAIN_ID"` diff --git a/server/controller/db/metadb/model/platform_rsc_model.go b/server/controller/db/metadb/model/platform_rsc_model.go index 3f9e007d919..033a0128ddb 100644 --- a/server/controller/db/metadb/model/platform_rsc_model.go +++ b/server/controller/db/metadb/model/platform_rsc_model.go @@ -94,6 +94,7 @@ type Process struct { VMID int `gorm:"column:vm_id;type:int;default:null" json:"VM_ID" mapstructure:"VM_ID"` VPCID int `gorm:"column:epc_id;type:int;default:null" json:"EPC_ID" mapstructure:"EPC_ID"` ProcessName string `gorm:"column:process_name;type:varchar(256);default:''" json:"PROCESS_NAME" mapstructure:"PROCESS_NAME"` + BizType int `gorm:"column:biz_type;type:int;default:0" json:"BIZ_TYPE" mapstructure:"BIZ_TYPE"` CommandLine string `gorm:"column:command_line;type:text" json:"COMMAND_LINE" mapstructure:"CMD_LINE"` UserName string `gorm:"column:user_name;type:varchar(256);default:''" json:"USER_NAME" mapstructure:"USER_NAME"` StartTime time.Time `gorm:"autoCreateTime;column:start_time;type:datetime" json:"START_TIME" mapstructure:"START_TIME"` diff --git a/server/controller/genesis/grpc/server.go b/server/controller/genesis/grpc/server.go index 008ddc168cc..5107bda547e 100644 --- a/server/controller/genesis/grpc/server.go +++ b/server/controller/genesis/grpc/server.go @@ -622,6 +622,7 @@ func (g *SynchronizerServer) GenesisSharingSync(ctx context.Context, request *co for _, p := range gSyncData.Processes { pData := p pStartTime := pData.StartTime.Format(controllercommon.GO_BIRTHDAY) + bizType := uint32(pData.BizType) gProcess := &controller.GenesisSyncProcess{ VtapId: &pData.VtapID, Pid: &pData.PID, @@ -629,6 +630,7 @@ func (g *SynchronizerServer) GenesisSharingSync(ctx context.Context, request *co NetnsId: &pData.NetnsID, Name: &pData.Name, ProcessName: &pData.ProcessName, + BizType: &bizType, CmdLine: &pData.CMDLine, User: &pData.UserName, ContainerId: &pData.ContainerID, diff --git a/server/controller/genesis/store/sync/mysql/run.go b/server/controller/genesis/store/sync/mysql/run.go index ce1c1916d9d..0341ff35cdc 100644 --- a/server/controller/genesis/store/sync/mysql/run.go +++ b/server/controller/genesis/store/sync/mysql/run.go @@ -384,6 +384,7 @@ func (g *GenesisSync) GetGenesisSyncResponse(orgID int) (common.GenesisSyncDataR NetnsID: p.GetNetnsId(), Name: p.GetName(), ProcessName: p.GetProcessName(), + BizType: int(p.GetBizType()), CMDLine: p.GetCmdLine(), ContainerID: p.GetContainerId(), UserName: p.GetUser(), diff --git a/server/controller/genesis/updater/sync.go b/server/controller/genesis/updater/sync.go index b546aea0a90..485d604ff67 100644 --- a/server/controller/genesis/updater/sync.go +++ b/server/controller/genesis/updater/sync.go @@ -539,6 +539,7 @@ func (v *GenesisSyncRpcUpdater) ParseProcessInfo(orgID int, vtapID uint32, messa NetnsID: p.GetNetnsId(), Name: p.GetName(), ProcessName: p.GetProcessName(), + BizType: int(p.GetBizType()), CMDLine: p.GetCmdline(), UserName: p.GetUser(), ContainerID: p.GetContainerId(), diff --git a/server/controller/model/model.go b/server/controller/model/model.go index 8dd7d4ab8da..a8957356514 100644 --- a/server/controller/model/model.go +++ b/server/controller/model/model.go @@ -752,6 +752,7 @@ type GenesisProcess struct { Lcuuid string `gorm:"primaryKey;column:lcuuid;type:char(64)" json:"LCUUID"` Name string `gorm:"column:name;type:text;default:null" json:"NAME"` ProcessName string `gorm:"column:process_name;type:text;default:null" json:"PROCESS_NAME"` + BizType int `gorm:"column:biz_type;type:int;default:0" json:"BIZ_TYPE"` CMDLine string `gorm:"column:cmd_line;type:text;default:null" json:"CMD_LINE"` ContainerID string `gorm:"column:container_id;type:char(64);default:''" json:"CONTAINER_ID"` UserName string `gorm:"column:user_name;type:varchar(256);default:null" json:"USER"` diff --git a/server/controller/recorder/cache/diffbase/process.go b/server/controller/recorder/cache/diffbase/process.go index 91c7afa7c99..5b2ba9c6049 100644 --- a/server/controller/recorder/cache/diffbase/process.go +++ b/server/controller/recorder/cache/diffbase/process.go @@ -34,6 +34,7 @@ func (b *DataSet) AddProcess(dbItem *metadbmodel.Process, seq int) { ContainerID: dbItem.ContainerID, DeviceType: dbItem.DeviceType, DeviceID: dbItem.DeviceID, + BizType: dbItem.BizType, } b.GetLogFunc()(addDiffBase(ctrlrcommon.RESOURCE_TYPE_PROCESS_EN, b.Process[dbItem.Lcuuid]), b.metadata.LogPrefixes) } @@ -50,12 +51,14 @@ type Process struct { ContainerID string `json:"container_id"` DeviceType int `json:"device_type"` DeviceID int `json:"device_id"` + BizType int `json:"biz_type"` } func (p *Process) Update(cloudItem *cloudmodel.Process, toolDataSet *tool.DataSet) { p.Name = cloudItem.Name p.OSAPPTags = cloudItem.OSAPPTags p.ContainerID = cloudItem.ContainerID + p.BizType = cloudItem.BizType deviceType, deviceID := toolDataSet.GetProcessDeviceTypeAndID(cloudItem.ContainerID, cloudItem.VTapID) if p.DeviceType != deviceType || p.DeviceID != deviceID { p.DeviceType = deviceType diff --git a/server/controller/recorder/domain.go b/server/controller/recorder/domain.go index a8ae8fa52a5..7616cdfbf9b 100644 --- a/server/controller/recorder/domain.go +++ b/server/controller/recorder/domain.go @@ -123,6 +123,11 @@ func (d *domain) tryRefresh(cloudData cloudmodel.Resource) error { d.updateStateInfo(cloudData) if err := d.shouldRefresh(cloudData); err != nil { + if err == DataNotVerifiedError && + len(cloudData.Processes) > 0 && + d.metadata.GetDomainInfo().Type == common.KUBERNETES { + return d.tryRefreshProcessOnly(cloudData) + } return err } @@ -141,6 +146,41 @@ func (d *domain) tryRefresh(cloudData cloudmodel.Resource) error { } } +func (d *domain) tryRefreshProcessOnly(cloudData cloudmodel.Resource) error { + select { + case <-d.cache.RefreshSignal: + d.cache.IncrementSequence() + d.cache.SetLogLevel(logging.INFO, cache.RefreshSignalCallerDomain) + + d.refreshProcessOnly(cloudData) + + d.cache.ResetRefreshSignal(cache.RefreshSignalCallerDomain) + return nil + default: + log.Info("domain refresh is running, does nothing", d.metadata.LogPrefixes) + return RefreshConflictError + } +} + +func (d *domain) refreshProcessOnly(cloudData cloudmodel.Resource) { + log.Info("domain process-only refresh started", d.metadata.LogPrefixes) + + // for process + d.cache.RefreshVTaps() + + processUpdater := updater.NewProcess(d.cache, cloudData.Processes).RegisterListener( + listener.NewProcess(d.cache), + ) + updaters := []updater.ResourceUpdater{processUpdater} + d.executeUpdaters(updaters) + d.notifyOnResourceChanged(updaters) + d.pubsub.PublishChange(d.msgMetadata) + + d.updateSyncedAt(cloudData.SyncAt) + + log.Info("domain process-only refresh completed", d.metadata.LogPrefixes) +} + func (d *domain) shouldRefresh(cloudData cloudmodel.Resource) error { if cloudData.Verified { if (!slices.Contains(rcommon.UNCHECK_NETWORK_DOMAINS, d.metadata.GetDomainInfo().Type) && len(cloudData.Networks) == 0) || len(cloudData.VInterfaces) == 0 { @@ -273,14 +313,26 @@ func (r *domain) executeUpdaters(updatersInUpdateOrder []updater.ResourceUpdater updater.HandleAddAndUpdate() } + if len(updatersInUpdateOrder) == 0 { + return + } + + if len(updatersInUpdateOrder) == 1 { + updatersInUpdateOrder[0].HandleDelete() + return + } + // 删除操作的顺序,是创建的逆序 // 特殊资源:VMPodNodeConnection虽然是末序创建,但需要末序删除,序号-1; // 原因:避免数据量大时,此数据删除后,云主机、容器节点还在,导致采集器类型变化 processUpdater := updatersInUpdateOrder[len(updatersInUpdateOrder)-1] vmPodNodeConnectionUpdater := updatersInUpdateOrder[len(updatersInUpdateOrder)-2] - // 因为 processUpdater 是 -1,VMPodNodeConnection 是 -2,特殊处理后,逆序删除从 -3 开始 - for i := len(updatersInUpdateOrder) - 3; i >= 0; i-- { - updatersInUpdateOrder[i].HandleDelete() + + if len(updatersInUpdateOrder) > 2 { + // 因为 processUpdater 是 -1,VMPodNodeConnection 是 -2,特殊处理后,逆序删除从 -3 开始 + for i := len(updatersInUpdateOrder) - 3; i >= 0; i-- { + updatersInUpdateOrder[i].HandleDelete() + } } processUpdater.HandleDelete() vmPodNodeConnectionUpdater.HandleDelete() diff --git a/server/controller/recorder/pubsub/message/update.go b/server/controller/recorder/pubsub/message/update.go index ef32538d5d7..f8301ab673b 100644 --- a/server/controller/recorder/pubsub/message/update.go +++ b/server/controller/recorder/pubsub/message/update.go @@ -665,6 +665,7 @@ type UpdatedProcessFields struct { ProcessName fieldDetail[string] ContainerID fieldDetail[string] OSAPPTags fieldDetail[string] + BizType fieldDetail[int] VMID fieldDetail[int] VPCID fieldDetail[int] GID fieldDetail[uint32] diff --git a/server/controller/recorder/updater/process.go b/server/controller/recorder/updater/process.go index 945eae749df..270ad05d807 100644 --- a/server/controller/recorder/updater/process.go +++ b/server/controller/recorder/updater/process.go @@ -121,6 +121,7 @@ func (p *Process) generateDBItemToAdd(cloudItem *cloudmodel.Process) (*metadbmod VTapID: cloudItem.VTapID, PID: cloudItem.PID, ProcessName: cloudItem.ProcessName, + BizType: cloudItem.BizType, CommandLine: cloudItem.CommandLine, StartTime: cloudItem.StartTime, UserName: cloudItem.UserName, @@ -156,6 +157,10 @@ func (p *Process) generateUpdateInfo(diffBase *diffbase.Process, cloudItem *clou mapInfo["os_app_tags"] = cloudItem.OSAPPTags structInfo.OSAPPTags.Set(diffBase.OSAPPTags, cloudItem.OSAPPTags) } + if diffBase.BizType != cloudItem.BizType { + mapInfo["biz_type"] = cloudItem.BizType + structInfo.BizType.Set(diffBase.BizType, cloudItem.BizType) + } if diffBase.ContainerID != cloudItem.ContainerID { mapInfo["container_id"] = cloudItem.ContainerID structInfo.ContainerID.Set(diffBase.ContainerID, cloudItem.ContainerID) diff --git a/server/controller/tagrecorder/ch_gprocess.go b/server/controller/tagrecorder/ch_gprocess.go index 3508d7de741..c7a2f724842 100644 --- a/server/controller/tagrecorder/ch_gprocess.go +++ b/server/controller/tagrecorder/ch_gprocess.go @@ -81,6 +81,7 @@ func (c *ChGProcess) sourceToTarget(md *message.Metadata, source *metadbmodel.Pr Name: sourceName, CHostID: source.VMID, L3EPCID: source.VPCID, + BizType: source.BizType, IconID: iconID, TeamID: md.GetTeamID(), DomainID: md.GetDomainID(), diff --git a/server/controller/tagrecorder/const_sql.go b/server/controller/tagrecorder/const_sql.go index a1f8eb4642c..2a8fb016cfa 100644 --- a/server/controller/tagrecorder/const_sql.go +++ b/server/controller/tagrecorder/const_sql.go @@ -222,6 +222,7 @@ const ( " `icon_id` Int64,\n" + " `chost_id` Int64,\n" + " `l3_epc_id` Int64,\n" + + " `biz_type` UInt64,\n" + " `team_id` UInt64,\n" + " `domain_id` UInt64,\n" + " `sub_domain_id` UInt64\n" + diff --git a/server/ingester/event/dbwriter/event.go b/server/ingester/event/dbwriter/event.go index 5f921048095..7bd2ab121b8 100644 --- a/server/ingester/event/dbwriter/event.go +++ b/server/ingester/event/dbwriter/event.go @@ -113,6 +113,7 @@ type EventStore struct { MountSource string `json:"mount_source" category:"$tag" sub:"event_info"` MountPoint string `json:"mount_point" category:"$tag" sub:"event_info"` FileDir string `json:"file_dir" category:"$tag" sub:"event_info"` + AccessPermission uint32 `json:"access_permission" category:"$tag" sub:"event_info"` } func (e *EventStore) NativeTagVersion() uint32 { @@ -238,6 +239,7 @@ func EventColumns(isFileEvent bool) []*ckdb.Column { ckdb.NewColumn("mount_source", ckdb.LowCardinalityString).SetGroupBy(), ckdb.NewColumn("mount_point", ckdb.LowCardinalityString).SetGroupBy(), ckdb.NewColumn("file_dir", ckdb.String).SetGroupBy(), + ckdb.NewColumn("access_permission", ckdb.UInt32).SetComment("文件权限位").SetIgnoredInAggrTable(), ) } return columns diff --git a/server/ingester/event/dbwriter/event_column_block.go b/server/ingester/event/dbwriter/event_column_block.go index ac86e916cb5..243031b5fc3 100644 --- a/server/ingester/event/dbwriter/event_column_block.go +++ b/server/ingester/event/dbwriter/event_column_block.go @@ -72,6 +72,7 @@ type EventBlock struct { ColMountSource *proto.ColLowCardinality[string] ColMountPoint *proto.ColLowCardinality[string] ColFileDir proto.ColStr + ColAccessPermission proto.ColUInt32 *nativetag.NativeTagsBlock } @@ -122,6 +123,7 @@ func (b *EventBlock) Reset() { b.ColMountSource.Reset() b.ColMountPoint.Reset() b.ColFileDir.Reset() + b.ColAccessPermission.Reset() if b.NativeTagsBlock != nil { b.NativeTagsBlock.Reset() } @@ -177,6 +179,7 @@ func (b *EventBlock) ToInput(input proto.Input) proto.Input { proto.InputColumn{Name: ckdb.COLUMN_MOUNT_SOURCE, Data: b.ColMountSource}, proto.InputColumn{Name: ckdb.COLUMN_MOUNT_POINT, Data: b.ColMountPoint}, proto.InputColumn{Name: ckdb.COLUMN_FILE_DIR, Data: &b.ColFileDir}, + proto.InputColumn{Name: ckdb.COLUMN_ACCESS_PERMISSION, Data: &b.ColAccessPermission}, ) } if b.NativeTagsBlock != nil { @@ -249,6 +252,7 @@ func (n *EventStore) AppendToColumnBlock(b ckdb.CKColumnBlock) { block.ColMountSource.Append(n.MountSource) block.ColMountPoint.Append(n.MountPoint) block.ColFileDir.Append(n.FileDir) + block.ColAccessPermission.Append(n.AccessPermission) if block.NativeTagsBlock != nil { block.NativeTagsBlock.AppendToColumnBlock(n.AttributeNames, n.AttributeValues, nil, nil) diff --git a/server/ingester/event/decoder/decoder.go b/server/ingester/event/decoder/decoder.go index 4745a82d674..d6836f990d7 100644 --- a/server/ingester/event/decoder/decoder.go +++ b/server/ingester/event/decoder/decoder.go @@ -195,7 +195,27 @@ func (d *Decoder) WriteFileEvent(vtapId uint16, e *pb.ProcEvent) { s.MountSource = string(ioData.MountSource) s.MountPoint = string(ioData.MountPoint) s.Bytes = ioData.BytesCount + s.AccessPermission = ioData.AccessPermission s.Duration = uint64(s.EndTime - s.StartTime) + } else if e.FileOpEventData != nil { + d := e.FileOpEventData + s.EventType = strings.ToLower(d.OpType.String()) + s.ProcessKName = string(e.ProcessKname) + s.FileName = string(d.Filename) + s.SyscallThread = e.ThreadId + s.SyscallCoroutine = e.CoroutineId + } else if e.PermOpEventData != nil { + d := e.PermOpEventData + s.EventType = strings.ToLower(d.OpType.String()) + s.ProcessKName = string(e.ProcessKname) + s.SyscallThread = e.ThreadId + s.SyscallCoroutine = e.CoroutineId + } else if e.ProcLifecycleEventData != nil { + d := e.ProcLifecycleEventData + s.EventType = strings.ToLower(d.LifecycleType.String()) + s.ProcessKName = string(d.Comm) + s.SyscallThread = e.ThreadId + s.SyscallCoroutine = e.CoroutineId } s.VTAPID = vtapId s.L3EpcID = d.platformData.QueryVtapEpc0(s.OrgId, vtapId) diff --git a/server/libs/ckdb/column.go b/server/libs/ckdb/column.go index 4e52d2d82ac..5c66b03da6d 100644 --- a/server/libs/ckdb/column.go +++ b/server/libs/ckdb/column.go @@ -32,6 +32,7 @@ func init() { const ( COLUMN_ACL_GID = "acl_gid" COLUMN_ACL_GIDS = "acl_gids" + COLUMN_ACCESS_PERMISSION = "access_permission" COLUMN_AGENT_ID = "agent_id" COLUMN_ALERT_POLICY = "alert_policy" COLUMN_APP_INSTANCE = "app_instance"