From 808fe91e3d48d1ad602e62b665dadd81cfa912c5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gr=C3=A9goire=20Deveaux?= Date: Fri, 2 Jan 2026 17:38:06 +0100 Subject: [PATCH] Make ConcurrentMultiSpanProcessor fork safe --- .../src/opentelemetry/sdk/trace/__init__.py | 12 ++++ .../tests/trace/test_span_processor.py | 69 +++++++++++++++++++ 2 files changed, 81 insertions(+) diff --git a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py index 0e7e1f6db3..9ae9a2234e 100644 --- a/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py +++ b/opentelemetry-sdk/src/opentelemetry/sdk/trace/__init__.py @@ -18,9 +18,11 @@ import concurrent.futures import json import logging +import os import threading import traceback import typing +import weakref from os import environ from time import time_ns from types import MappingProxyType, TracebackType @@ -238,6 +240,16 @@ def __init__(self, num_threads: int = 2): # iterating through it on "on_start" and "on_end". self._span_processors = () # type: Tuple[SpanProcessor, ...] self._lock = threading.Lock() + self._init_executor(num_threads) + if hasattr(os, "register_at_fork"): + # Only the main thread is kept in forked processed, the executor + # needs to be re-instantiated to get a fresh pool of threads: + weak_reinit = weakref.WeakMethod(self._init_executor) + os.register_at_fork( + after_in_child=lambda: weak_reinit()(num_threads) + ) + + def _init_executor(self, num_threads: int) -> None: self._executor = concurrent.futures.ThreadPoolExecutor( max_workers=num_threads ) diff --git a/opentelemetry-sdk/tests/trace/test_span_processor.py b/opentelemetry-sdk/tests/trace/test_span_processor.py index d1cf1e3df0..9b6312aa59 100644 --- a/opentelemetry-sdk/tests/trace/test_span_processor.py +++ b/opentelemetry-sdk/tests/trace/test_span_processor.py @@ -13,9 +13,13 @@ # limitations under the License. import abc +import gc +import multiprocessing +import os import time import typing import unittest +import weakref from platform import python_implementation, system from threading import Event from typing import Optional @@ -26,6 +30,10 @@ from opentelemetry import trace as trace_api from opentelemetry.context import Context from opentelemetry.sdk import trace +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import ( + InMemorySpanExporter, +) def span_event_start_fmt(span_processor_name, span_name): @@ -486,3 +494,64 @@ def test_force_flush_late_by_span_processor(self): for mock_processor in mocks: self.assertEqual(1, mock_processor.force_flush.call_count) multi_processor.shutdown() + + def test_processor_gc(self): + multi_processor = trace.ConcurrentMultiSpanProcessor(5) + weak_ref = weakref.ref(multi_processor) + multi_processor.shutdown() + + # When the processor is garbage collected + del multi_processor + gc.collect() + + # Then the reference to the processor should no longer exist + self.assertIsNone( + weak_ref(), + "The ConcurrentMultiSpanProcessor object created by this test wasn't garbage collected", + ) + + @unittest.skipUnless(hasattr(os, "fork"), "needs *nix") + def test_batch_span_processor_fork(self): + multiprocessing.set_start_method("fork") + tracer_provider = trace.TracerProvider() + tracer = tracer_provider.get_tracer(__name__) + exporter = InMemorySpanExporter() + multi_processor = trace.ConcurrentMultiSpanProcessor(2) + multi_processor.add_span_processor(SimpleSpanProcessor(exporter)) + tracer_provider.add_span_processor(multi_processor) + + # Use the ConcurrentMultiSpanProcessor in the main process. + # This is necessary in this test to start using the underlying ThreadPoolExecutor and avoid false positive: + with tracer.start_as_current_span("main process before fork span"): + pass + assert ( + exporter.get_finished_spans()[-1].name + == "main process before fork span" + ) + + # The forked ConcurrentMultiSpanProcessor is usable in the child process: + def child(conn): + with tracer.start_as_current_span("child process span"): + pass + conn.send(exporter.get_finished_spans()[-1].name) + conn.close() + + parent_conn, child_conn = multiprocessing.Pipe() + process = multiprocessing.Process(target=child, args=(child_conn,)) + process.start() + has_response = parent_conn.poll(timeout=5) + if not has_response: + process.kill() + self.fail( + "The child process did not send any message after 5 seconds, it's very probably locked" + ) + process.join(timeout=5) + assert parent_conn.recv() == "child process span" + + # The ConcurrentMultiSpanProcessor is still usable in the main process after the child process termination: + with tracer.start_as_current_span("main process after fork span"): + pass + assert ( + exporter.get_finished_spans()[-1].name + == "main process after fork span" + )