@@ -1685,10 +1685,31 @@ def _sort_network_spans(self):
16851685 else :
16861686 sorted_spans [i ].flow ['agent_rank' ] = 1
16871687
1688- sorted_spans = sorted (
1689- sorted_spans ,
1690- key = lambda x : (x .flow ['agent_rank' ], - x .flow ['response_duration' ],
1691- x .flow ['start_time_us' ], - x .flow ['end_time_us' ]))
1688+ # 这里如果有客户端进程+服务端进程,一定分布在 tcp_seq 的两端,所以这两个 span 不参与排序
1689+ # 这里预期一个 TCP_SEQ 内最多只有一个客户端进程,最多只有一个服务端进程 span,但如果发生异常情况,比如请求-响应聚合失败,那么:
1690+ # 这里也必须兼容【最多两个客户端进程,最多两个服务端进程】在同一个 TCP_SEQ 组的情况
1691+ # 于是,这里单独对三组数据做独立排序
1692+ client_processes = []
1693+ server_processes = []
1694+ network_spans = []
1695+
1696+ for span in sorted_spans :
1697+ if span .tap_side == const .TAP_SIDE_CLIENT_PROCESS :
1698+ client_processes .append (span )
1699+ elif span .tap_side == const .TAP_SIDE_SERVER_PROCESS :
1700+ server_processes .append (span )
1701+ else :
1702+ network_spans .append (span )
1703+
1704+ def sorted_func (x ):
1705+ return (x .flow ['agent_rank' ], - x .flow ['response_duration' ],
1706+ x .flow ['start_time_us' ], - x .flow ['end_time_us' ])
1707+
1708+ client_processes = sorted (client_processes , key = sorted_func )
1709+ network_spans = sorted (network_spans , key = sorted_func )
1710+ server_processes = sorted (server_processes , key = sorted_func )
1711+
1712+ sorted_spans = client_processes + network_spans + server_processes
16921713
16931714 # 当 ingress_agent=egress_agent 时
16941715 # 如果中间穿过了其他节点数据,需要将所有 server-side span 排序到末尾
@@ -1808,6 +1829,27 @@ def time_range_cover(self,
18081829 'start_time_us' ] and self .flow ['response_duration' ] == 0
18091830 return covered
18101831
1832+ def time_full_before (self ,
1833+ other_sys_span : 'SpanNode' ,
1834+ allow_lost_resp = False ) -> bool :
1835+ '''
1836+ 这里与上面 time_range_cover 的区别是:这里要求 self 完全在 other_sys_span 之前结束
1837+ 这里其实判断 self.end_time < other.end_time 是为了避免与上述 cover 的场景混淆在一起,因为「覆盖」是更优先的
1838+ '''
1839+ before = self .flow ['start_time_us' ] <= other_sys_span .flow ['start_time_us' ] \
1840+ and self .flow ['end_time_us' ] < other_sys_span .flow ['end_time_us' ]
1841+ if allow_lost_resp and not before :
1842+ before = self .flow ['start_time_us' ] <= other_sys_span .flow ['start_time_us' ] \
1843+ and self .flow ['response_duration' ] == 0
1844+ return before
1845+
1846+ def time_start_before (self , other_sys_span : 'SpanNode' ) -> bool :
1847+ '''
1848+ 这里与上面 time_before 的区别是:这里要求 self 比较 start_time 即可
1849+ '''
1850+ return self .flow ['start_time_us' ] <= other_sys_span .flow [
1851+ 'start_time_us' ]
1852+
18111853
18121854class AppSpanNode (SpanNode ):
18131855
@@ -2263,8 +2305,8 @@ def try_attach_client_sys_span_via_sys_span(self,
22632305 # process_matched 防错: 避免 auto_instance 匹配到 host 但实际进程不同的情况
22642306 # time_range_cover: 校验 client_sys_flow 是否落入 s-p 时间范围内
22652307 # 这里假设 c-p 正常完成,但 s-p 后续有异常导致没有响应,允许时间仅校验一半,对应参数 allow_lost_resp
2266- if isinstance (span , SysSpanNode ) and \
2267- not ( span . process_matched ( client_sys_span ) and span .time_range_cover (client_sys_span , allow_lost_resp = True )):
2308+ if isinstance (span , SysSpanNode ) and not ( span . process_matched ( client_sys_span ) \
2309+ and span .time_range_cover (client_sys_span , allow_lost_resp = True )):
22682310 return None , ""
22692311
22702312 sys_span_matched = x_request_id_match = same_process_trace_match = False
@@ -2306,7 +2348,29 @@ def try_attach_client_sys_span_via_sys_span(self,
23062348 # 同一进程下,如果既有 x_request_id 匹配关系,也有 syscall_trace_id 匹配,如果扫描 process_span_set 顺序不同,会导致挂错
23072349 # 对此类情况,先不要直接追加,应追加到【时间最接近】的一个 process_span_set
23082350 return span , f"c-p sys-span mounted due to { mounted_info } "
2309- return None , ""
2351+ return None , mounted_info
2352+
2353+ def try_attach_client_sys_span_via_preceding_sys_span (
2354+ self , client_sys_span : SysSpanNode ):
2355+ '''
2356+ 这里与上面 try_attach_client_sys_span_via_sys_span 不同点在于:
2357+ 这里尝试解决「异步」的问题,try_attach_client_sys_span_via_sys_span 处理的是同一进程内,服务端进程覆盖客户端进程的情况
2358+ 这里处理服务端进程不覆盖,且早于客户端进程结束的情况
2359+ 因此,这里的逻辑是纯粹的「推断」逻辑,并非完全可靠的强关联。
2360+ 关联条件:process_id + trace_id + s-p 时间必须小于 c-p
2361+ '''
2362+ mounted_info = ""
2363+ for span in self .spans :
2364+ if span .tap_side == const .TAP_SIDE_SERVER_PROCESS :
2365+ if isinstance (span , SysSpanNode ) and not (span .process_matched (client_sys_span ) \
2366+ and span .time_full_before (client_sys_span , allow_lost_resp = True )):
2367+ return None , ""
2368+ same_trace_id_match = self .g_trace_id & set (
2369+ client_sys_span .flow ["trace_id" ])
2370+ if same_trace_id_match :
2371+ mounted_info = "preceding c-p as async span after s-p root by same process/trace_id"
2372+ return span , f"c-p sys-span mounted due to { mounted_info } "
2373+ return None , mounted_info
23102374
23112375 def try_append_to_client_sys_span_via_sys_span (
23122376 self , client_sys_span : SysSpanNode ):
@@ -2356,7 +2420,7 @@ def try_append_to_client_sys_span_via_sys_span(
23562420
23572421 if x_request_id_match or same_process_trace_match :
23582422 return span , f"s-p sys-span mounted due to { mounted_info } "
2359- return None , ""
2423+ return None , mounted_info
23602424
23612425 def indirect_attach_client_sys_span_via_sys_span (
23622426 self , server_sys_span : SpanNode ,
@@ -2554,6 +2618,9 @@ def merge_flow(flows: list, flow: dict) -> bool:
25542618 L7_PROTOCOL_GRPC , L7_PROTOCOL_HTTP2
25552619 ]:
25562620 return False
2621+ # 避免在这里错误合并 async flow,直接 return
2622+ if flow ['is_async' ]:
2623+ return False
25572624
25582625 # for special case: DNS sys span
25592626 is_sys_span = flow ['tap_side' ] in [
@@ -2577,6 +2644,8 @@ def merge_flow(flows: list, flow: dict) -> bool:
25772644 if flows [i ][
25782645 'type' ] != L7_FLOW_TYPE_REQUEST and not allow_merge_type : # 不满足条件的仅需要合并至 REQUEST
25792646 continue
2647+ if flows [i ]['is_async' ]:
2648+ continue
25802649
25812650 # 通过 vtap_id + flow_id + request_id 匹配到同一个 Request
25822651 # vtap_id + flow_id:唯一确定一条 L4 Flow
@@ -2696,6 +2765,8 @@ def sort_all_flows(dataframe_flows: DataFrame, network_delay_us: int,
26962765 flow [key ] = None
26972766 else :
26982767 flow [key ] = value
2768+ if flow ['l7_protocol' ] == L7_PROTOCOL_WEBSPHERE_MQ :
2769+ pass
26992770 if merge_flow (flows , flow ): # 合并单向Flow为会话
27002771 continue
27012772 # 合并异步 flow 为会话
@@ -3020,6 +3091,26 @@ def _union_sys_spans(
30203091 target_child = matched_child
30213092 target_child_info = matched_child_info
30223093
3094+ # 上面准确的逻辑执行完之后,执行「推断」的逻辑
3095+ # 这里的逻辑是,当一个客户端进程找不到时间上覆盖的服务端进程,但找到了一个同进程、同 trace_id 时间提前的服务端进程
3096+ # 那么,这里认为这个客户端进程是一个异步调用,在服务端进程结束之后仍然没结束,标记父子关系
3097+ # 如果上面的逻辑找到了 target_parent,这里肯定不用执行了
3098+ # 这里只对「准确的逻辑」中找不到 target_parent 的情况做补偿处理
3099+ if target_parent is None :
3100+ for sp_process_span_set in target_sp_list :
3101+ matched_parent , mounted_info = sp_process_span_set .try_attach_client_sys_span_via_preceding_sys_span (
3102+ span )
3103+ if matched_parent is not None :
3104+ if target_parent is None :
3105+ target_parent = matched_parent
3106+ target_parent_info = mounted_info
3107+ elif matched_parent .flow [
3108+ 'start_time_us' ] > target_parent .flow [
3109+ 'start_time_us' ]:
3110+ # 在有多个 s-p 都满足匹配条件的情况下,选开始时间最大的(在满足时间覆盖的情况下,这说明此 s-p 最接近 c-p),它更有可能是直接的【上一跳】
3111+ target_parent = matched_parent
3112+ target_parent_info = mounted_info
3113+
30233114 if target_parent is not None :
30243115 target_parent .process_span_set .append_sys_span (span )
30253116 span .set_parent (target_parent , target_parent_info ,
@@ -3312,8 +3403,15 @@ def _connect_process_and_networks(
33123403 or _same_span_set (net_parent , net_child , 'process_span_set' ):
33133404 continue
33143405
3406+ # 这里 network tcp match 的情况比较特殊,由于可能有异步七层网关+x_request_id 的场景存在,所以不能判断 time_range_cover
3407+ # 但依旧可以简单地判断 time_start_before,即对同一个 agent_id,要求 net_parent 开始时间 < net_child 开始时间
3408+ if net_parent .signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL and net_child .signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL :
3409+ if net_parent .agent_id == net_child .agent_id and not net_parent .time_start_before (
3410+ net_child ):
3411+ continue
3412+
33153413 # 由于 x_request_id 没有 parent_id 这类设计,容易发生环路,故 net span 都要根据时延判断先后顺序
3316- if (net_parent_x_request_id_1 and net_parent_x_request_id_1 == net_child .get_x_request_id_0 ())\
3414+ if (net_parent_x_request_id_1 and net_parent_x_request_id_1 == net_child .get_x_request_id_0 ()) \
33173415 or (net_parent_x_request_id_0 and net_parent_x_request_id_0 == net_child .get_x_request_id_0 ()) \
33183416 or (net_parent_x_request_id_1 and net_parent_x_request_id_1 == net_child .get_x_request_id_1 ()) \
33193417 or (net_parent_span_id and net_parent_span_id == net_child .get_span_id ()):
@@ -3354,7 +3452,7 @@ def _connect_process_and_networks(
33543452 if net_child .get_parent_id () >= 0 :
33553453 continue
33563454 if _same_span_set (net_parent , net_child , 'network_span_set' ) \
3357- or _same_span_set (net_parent , net_child , 'process_span_set' ):
3455+ or _same_span_set (net_parent , net_child , 'process_span_set' ):
33583456 continue
33593457 if net_parent_l7_protocol in [L7_PROTOCOL_HTTP2 , L7_PROTOCOL_GRPC ] \
33603458 and net_parent_l7_protocol == net_child .flow ['l7_protocol' ] \
@@ -3730,8 +3828,10 @@ def correct_span_time(flows: dict, host_clock_correction: dict,
37303828 # should verify `agent` by instance_to_agent record by Ebpf/Packet signal source
37313829 agent_id = instance_to_agent .get (flow ['auto_instance' ],
37323830 flow ['vtap_id' ])
3733- flow ['render_start_time_us' ] = flow ['start_time_us' ] + host_clock_correction .get (agent_id , 0 )
3734- flow ['render_end_time_us' ] = flow ['end_time_us' ] + host_clock_correction .get (agent_id , 0 )
3831+ flow ['render_start_time_us' ] = flow [
3832+ 'start_time_us' ] + host_clock_correction .get (agent_id , 0 )
3833+ flow ['render_end_time_us' ] = flow [
3834+ 'end_time_us' ] + host_clock_correction .get (agent_id , 0 )
37353835
37363836
37373837def format_final_result (
0 commit comments