Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion app/app.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,11 @@ app:
# tcp_seq: network layer tracing through TCP seq
# x_request_id: application layer tracing through X-Request-ID/http header
# dns: DNS protocol tracing through DNS transaction-id (request_id), IT'S DISABLED BY DEFAULT, REQUIRED ENABLED MANUALLY IF NEEDED
tracing_source: []
tracing_source: []
# span_set connection strategies for building span-tree, default: [] (no extra weak connections)
# ONLY controls cross span_set connections with WEAK relations(which means maybe incorrect, but better than no connection)
# available options: [net_span_c_to_s_via_trace_id]
# each strategy enables one type of weak cross span_set connection when strong evidence (tcp_seq/x_request_id/span_id) is absent:
# net_span_c_to_s_via_trace_id: connect a client-side leaf NetSpan to a server-side root NetSpan
# when they share the same trace_id but have different tcp_seq
span_set_connection_strategies: []
92 changes: 83 additions & 9 deletions app/app/application/l7_flow_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3456,7 +3456,8 @@ def _connect_process_and_networks(
else:
last_parent_index, _ = network_match_parent[_index]
last_matched_parent = flow_index_to_span[last_parent_index]
if last_matched_parent.get_response_duration() > net_parent_response_duration:
if last_matched_parent.get_response_duration(
) > net_parent_response_duration:
# 根据 `时延最接近` 原则找 parent
# 即在满足条件的 parent 里找到时延最接近最小的 net_parent,它更有可能是直接的 `上一跳`
# network_match_parent[net_child_index] 指向 net_parent 的 _index,从 flow_index_to_span 中取 response_duration
Expand Down Expand Up @@ -3491,6 +3492,12 @@ def _connect_process_and_networks(
net_parent, "net_span mounted due to grpc request_id",
host_clock_correct_callback)

flow_index_to_trace_id_sets: Dict[int, Set] = {}
for net_span in network_leafs + network_roots:
if not net_span.get_trace_id():
continue
flow_index_to_trace_id_sets[net_span.get_flow_index()] = set(
net_span.get_trace_id())
# 6. 特殊逻辑处理
# 这里单独对 [network_span] 进行连接处理,仅处理「指定的协议、具有 trace_id 的场景」
# 目前处理:仅对 WebSphereMQ 协议,具有 is_async 标识的 c-s 之间互相连接
Expand All @@ -3507,10 +3514,11 @@ def _connect_process_and_networks(
if not net_parent.tap_side.startswith('c'):
continue
# 只能基于 trace_id 做交集判断
net_parent_trace_id = net_parent.get_trace_id()
net_parent_index = net_parent.get_flow_index()
net_parent_trace_id = flow_index_to_trace_id_sets.get(
net_parent_index, set())
if not net_parent_trace_id:
continue
net_parent_index = net_parent.get_flow_index()
net_parent_start_time = net_parent.get_start_time()
# 由于这里的关联关系是 trace_id,等于没有任何关联关系,因此实际上是「硬挂」的
# XXX: 这里做 O(n2) 遍历需要考虑下性能
Expand All @@ -3531,10 +3539,8 @@ def _connect_process_and_networks(
if _same_span_set(net_parent, net_child, 'network_span_set') \
or _same_span_set(net_parent, net_child, 'process_span_set'):
continue
net_child_trace_id = net_child.get_trace_id()
if not net_child_trace_id:
continue
if not (set(net_parent_trace_id) & set(net_child_trace_id)):
if not (net_parent_trace_id
& flow_index_to_trace_id_sets.get(net_child_index, set())):
continue
# 这里 network tcp match 的情况比较特殊,由于实际上就是异步,所以不能判断 time_range_cover
# 但依旧可以简单地判断 time_start_before,即对同一个 agent_id,要求 net_parent 开始时间 < net_child 开始时间
Expand All @@ -3547,9 +3553,11 @@ def _connect_process_and_networks(
net_parent_index,
"net_span mounted due to webspheremq async trace_id")
else:
last_parent_index, last_mounted_info = network_match_parent[net_child_index]
last_parent_index, last_mounted_info = network_match_parent[
net_child_index]
last_matched_parent = flow_index_to_span[last_parent_index]
if last_matched_parent.get_start_time() < net_parent_start_time:
if last_matched_parent.get_start_time(
) < net_parent_start_time:
# 如果有多个符合,直接取开始时间最大的,即最接近的
# 这里的 mounted_info 是固定的,直接复用即可
network_match_parent[net_child_index] = (net_parent_index,
Expand All @@ -3565,6 +3573,72 @@ def _connect_process_and_networks(
mounted_info,
host_clock_correct_callback)

# 在上面的「准确」逻辑执行之后,执行「可能不准确」的逻辑
weak_match_parent: Dict[int, int] = {}

# 7. 通过 trace_id 关联:叶子 span(客户端侧,无子节点)→ 根 span(服务端侧,无父节点)
# 条件:trace_id 有交集,tcp_seq 不同,叶子时延 >= 根时延
if 'net_span_c_to_s_via_trace_id' in config.span_set_connection_strategies:
for net_parent in network_leafs:
if net_parent.children_count > 0:
continue
if not net_parent.tap_side.startswith('c'):
continue
net_parent_index = net_parent.get_flow_index()
net_parent_trace_id = flow_index_to_trace_id_sets.get(
net_parent_index, set())
if not net_parent_trace_id:
continue
net_parent_response_duration = net_parent.get_response_duration()
net_parent_req_tcp_seq = net_parent.get_req_tcp_seq()
net_parent_resp_tcp_seq = net_parent.get_resp_tcp_seq()
for net_child in network_roots:
if not net_child.is_net_root:
continue
net_child_index = net_child.get_flow_index()
if net_parent_index == net_child_index:
continue
if net_child.get_parent_id() >= 0:
continue
if not net_child.tap_side.startswith('s'):
continue
if _same_span_set(net_parent, net_child, 'network_span_set') \
or _same_span_set(net_parent, net_child, 'process_span_set'):
continue
if net_parent.signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL and net_child.signal_source != L7_FLOW_SIGNAL_SOURCE_OTEL:
if net_parent.agent_id == net_child.agent_id and not net_parent.time_start_before(
net_child):
continue
# tcp_seq 不同(不应该已在同一 NetworkSpanSet 内)
if net_parent_req_tcp_seq and net_parent_req_tcp_seq == net_child.get_req_tcp_seq(
):
continue
if net_parent_resp_tcp_seq and net_parent_resp_tcp_seq == net_child.get_resp_tcp_seq(
):
continue
if not (net_parent_trace_id & flow_index_to_trace_id_sets.get(
net_child_index, set())):
continue
# 叶子 span 时延 >= 根 span 时延
if net_parent_response_duration < net_child.get_response_duration(
):
continue
if net_child_index not in weak_match_parent:
weak_match_parent[net_child_index] = net_parent_index
else:
last_parent_index = weak_match_parent[net_child_index]
last_matched_parent = flow_index_to_span[last_parent_index]
if last_matched_parent.get_response_duration(
) > net_parent_response_duration:
# 取时延最接近(最小满足条件)的 parent
weak_match_parent[net_child_index] = net_parent_index

for child, parent in weak_match_parent.items():
flow_index_to_span[child].set_parent(
flow_index_to_span[parent],
"net_span c->s weakly mounted due to trace_id",
host_clock_correct_callback)


def format_trace(services: List[ProcessSpanSet],
networks: List[NetworkSpanSet]) -> dict:
Expand Down
4 changes: 4 additions & 0 deletions app/app/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def parse_spec(self, cfg):
self.tracing_source = spec.get(
'tracing_source',
["trace_id", "syscall", "tcp_seq", "x_request_id"])
strategies = spec.get('span_set_connection_strategies', [])
if not isinstance(strategies, list):
strategies = []
self.span_set_connection_strategies = strategies

def parse_querier(self, cfg):
querier = cfg.get('querier', dict())
Expand Down
Loading