diff --git a/app/app.yaml b/app/app.yaml index 92c1536..5eca977 100644 --- a/app/app.yaml +++ b/app/app.yaml @@ -42,4 +42,11 @@ app: # tcp_seq: network layer tracing through TCP seq # x_request_id: application layer tracing through X-Request-ID/http header # dns: DNS protocol tracing through DNS transaction-id (request_id), IT'S DISABLED BY DEFAULT, REQUIRED ENABLED MANUALLY IF NEEDED - tracing_source: [] \ No newline at end of file + tracing_source: [] + # span_set connection strategies for building span-tree, default: [] (no extra weak connections) + # ONLY controls cross span_set connections with WEAK relations(which means maybe incorrect, but better than no connection) + # available options: [net_span_c_to_s_via_trace_id] + # each strategy enables one type of weak cross span_set connection when strong evidence (tcp_seq/x_request_id/span_id) is absent: + # net_span_c_to_s_via_trace_id: connect a client-side leaf NetSpan to a server-side root NetSpan + # when they share the same trace_id but have different tcp_seq + span_set_connection_strategies: [] \ No newline at end of file diff --git a/app/app/application/l7_flow_tracing.py b/app/app/application/l7_flow_tracing.py index 489acf5..883fc64 100644 --- a/app/app/application/l7_flow_tracing.py +++ b/app/app/application/l7_flow_tracing.py @@ -3456,7 +3456,8 @@ def _connect_process_and_networks( else: last_parent_index, _ = network_match_parent[_index] last_matched_parent = flow_index_to_span[last_parent_index] - if last_matched_parent.get_response_duration() > net_parent_response_duration: + if last_matched_parent.get_response_duration( + ) > net_parent_response_duration: # 根据 `时延最接近` 原则找 parent # 即在满足条件的 parent 里找到时延最接近最小的 net_parent,它更有可能是直接的 `上一跳` # network_match_parent[net_child_index] 指向 net_parent 的 _index,从 flow_index_to_span 中取 response_duration @@ -3491,6 +3492,12 @@ def _connect_process_and_networks( net_parent, "net_span mounted due to grpc request_id", host_clock_correct_callback) + flow_index_to_trace_id_sets: Dict[int, Set] = {} + for net_span in network_leafs + network_roots: + if not net_span.get_trace_id(): + continue + flow_index_to_trace_id_sets[net_span.get_flow_index()] = set( + net_span.get_trace_id()) # 6. 特殊逻辑处理 # 这里单独对 [network_span] 进行连接处理,仅处理「指定的协议、具有 trace_id 的场景」 # 目前处理:仅对 WebSphereMQ 协议,具有 is_async 标识的 c-s 之间互相连接 @@ -3507,10 +3514,11 @@ def _connect_process_and_networks( if not net_parent.tap_side.startswith('c'): continue # 只能基于 trace_id 做交集判断 - net_parent_trace_id = net_parent.get_trace_id() + net_parent_index = net_parent.get_flow_index() + net_parent_trace_id = flow_index_to_trace_id_sets.get( + net_parent_index, set()) if not net_parent_trace_id: continue - net_parent_index = net_parent.get_flow_index() net_parent_start_time = net_parent.get_start_time() # 由于这里的关联关系是 trace_id,等于没有任何关联关系,因此实际上是「硬挂」的 # XXX: 这里做 O(n2) 遍历需要考虑下性能 @@ -3531,10 +3539,8 @@ def _connect_process_and_networks( if _same_span_set(net_parent, net_child, 'network_span_set') \ or _same_span_set(net_parent, net_child, 'process_span_set'): continue - net_child_trace_id = net_child.get_trace_id() - if not net_child_trace_id: - continue - if not (set(net_parent_trace_id) & set(net_child_trace_id)): + if not (net_parent_trace_id + & flow_index_to_trace_id_sets.get(net_child_index, set())): continue # 这里 network tcp match 的情况比较特殊,由于实际上就是异步,所以不能判断 time_range_cover # 但依旧可以简单地判断 time_start_before,即对同一个 agent_id,要求 net_parent 开始时间 < net_child 开始时间 @@ -3547,9 +3553,11 @@ def _connect_process_and_networks( net_parent_index, "net_span mounted due to webspheremq async trace_id") else: - last_parent_index, last_mounted_info = network_match_parent[net_child_index] + last_parent_index, last_mounted_info = network_match_parent[ + net_child_index] last_matched_parent = flow_index_to_span[last_parent_index] - if last_matched_parent.get_start_time() < net_parent_start_time: + if last_matched_parent.get_start_time( + ) < net_parent_start_time: # 如果有多个符合,直接取开始时间最大的,即最接近的 # 这里的 mounted_info 是固定的,直接复用即可 network_match_parent[net_child_index] = (net_parent_index, @@ -3565,6 +3573,72 @@ def _connect_process_and_networks( mounted_info, host_clock_correct_callback) + # 在上面的「准确」逻辑执行之后,执行「可能不准确」的逻辑 + weak_match_parent: Dict[int, int] = {} + + # 7. 通过 trace_id 关联:叶子 span(客户端侧,无子节点)→ 根 span(服务端侧,无父节点) + # 条件:trace_id 有交集,tcp_seq 不同,叶子时延 >= 根时延 + if 'net_span_c_to_s_via_trace_id' in config.span_set_connection_strategies: + for net_parent in network_leafs: + if net_parent.children_count > 0: + continue + if not net_parent.tap_side.startswith('c'): + continue + net_parent_index = net_parent.get_flow_index() + net_parent_trace_id = flow_index_to_trace_id_sets.get( + net_parent_index, set()) + if not net_parent_trace_id: + continue + net_parent_response_duration = net_parent.get_response_duration() + net_parent_req_tcp_seq = net_parent.get_req_tcp_seq() + net_parent_resp_tcp_seq = net_parent.get_resp_tcp_seq() + for net_child in network_roots: + if not net_child.is_net_root: + continue + net_child_index = net_child.get_flow_index() + if net_parent_index == net_child_index: + continue + if net_child.get_parent_id() >= 0: + continue + if not net_child.tap_side.startswith('s'): + continue + if _same_span_set(net_parent, net_child, 'network_span_set') \ + or _same_span_set(net_parent, net_child, 'process_span_set'): + continue + if net_parent.signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL and net_child.signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL: + if net_parent.agent_id == net_child.agent_id and not net_parent.time_start_before( + net_child): + continue + # tcp_seq 不同(不应该已在同一 NetworkSpanSet 内) + if net_parent_req_tcp_seq and net_parent_req_tcp_seq == net_child.get_req_tcp_seq( + ): + continue + if net_parent_resp_tcp_seq and net_parent_resp_tcp_seq == net_child.get_resp_tcp_seq( + ): + continue + if not (net_parent_trace_id & flow_index_to_trace_id_sets.get( + net_child_index, set())): + continue + # 叶子 span 时延 >= 根 span 时延 + if net_parent_response_duration < net_child.get_response_duration( + ): + continue + if net_child_index not in weak_match_parent: + weak_match_parent[net_child_index] = net_parent_index + else: + last_parent_index = weak_match_parent[net_child_index] + last_matched_parent = flow_index_to_span[last_parent_index] + if last_matched_parent.get_response_duration( + ) > net_parent_response_duration: + # 取时延最接近(最小满足条件)的 parent + weak_match_parent[net_child_index] = net_parent_index + + for child, parent in weak_match_parent.items(): + flow_index_to_span[child].set_parent( + flow_index_to_span[parent], + "net_span c->s weakly mounted due to trace_id", + host_clock_correct_callback) + def format_trace(services: List[ProcessSpanSet], networks: List[NetworkSpanSet]) -> dict: diff --git a/app/app/config.py b/app/app/config.py index f9e0dd5..0705773 100644 --- a/app/app/config.py +++ b/app/app/config.py @@ -27,6 +27,10 @@ def parse_spec(self, cfg): self.tracing_source = spec.get( 'tracing_source', ["trace_id", "syscall", "tcp_seq", "x_request_id"]) + strategies = spec.get('span_set_connection_strategies', []) + if not isinstance(strategies, list): + strategies = [] + self.span_set_connection_strategies = strategies def parse_querier(self, cfg): querier = cfg.get('querier', dict())