From 7bc0858519c035c4d9565b209ee0ba0a28ddffc1 Mon Sep 17 00:00:00 2001 From: Sean Brar Date: Wed, 4 Feb 2026 21:32:02 -0800 Subject: [PATCH 1/2] feat(core): implement v0.2.0 zero-cost telemetry logic --- src/nullscope/__init__.py | 112 +++++++++++++++++++++++++++++++++++--- 1 file changed, 103 insertions(+), 9 deletions(-) diff --git a/src/nullscope/__init__.py b/src/nullscope/__init__.py index d596077..b8b7782 100644 --- a/src/nullscope/__init__.py +++ b/src/nullscope/__init__.py @@ -4,17 +4,19 @@ metrics when enabled via environment flags. """ +import inspect import logging import os import re import time from collections import deque -from collections.abc import Iterator +from collections.abc import Callable, Iterator from contextlib import AbstractContextManager, contextmanager from contextvars import ContextVar from dataclasses import dataclass +from functools import wraps from types import TracebackType -from typing import Any, Final, Protocol, TypeAlias, TypedDict, runtime_checkable +from typing import Any, Final, Protocol, TypeAlias, TypedDict, TypeVar, cast, runtime_checkable log = logging.getLogger(__name__) @@ -41,6 +43,9 @@ END_WALL_TIME_S: Final[str] = "end_wall_time_s" _SCOPE_NAME_PATTERN = re.compile(r"^[a-z][a-z0-9_]*(?:\.[a-z][a-z0-9_]*)*$") +_REQUIRED_REPORTER_METHODS: Final[tuple[str, ...]] = ("record_timing", "record_metric") + +F = TypeVar("F", bound=Callable[..., Any]) class _TelemetryMetadata(TypedDict, total=False): @@ -61,6 +66,36 @@ def record_timing(self, scope: str, duration: float, **metadata: Any) -> None: . def record_metric(self, scope: str, value: Any, **metadata: Any) -> None: ... # noqa: D102 +def _validate_scope_name(name: str, *, kind: str) -> str: + """Validate user-provided scope names with explicit errors.""" + if not isinstance(name, str): + raise TypeError(f"{kind} must be a string, got {type(name).__name__}") + if not name: + raise ValueError(f"{kind} must be a non-empty string") + if _STRICT_SCOPES and not _SCOPE_NAME_PATTERN.fullmatch(name): + raise ValueError( + f"Invalid {kind} '{name}'. Expected dot-separated lowercase segments " + "using [a-z0-9_], e.g. 'http.request'.", + ) + return name + + +def _validate_reporters(reporters: tuple[Any, ...]) -> None: + """Check reporter shape early so failures are obvious at setup time.""" + for index, reporter in enumerate(reporters): + missing = [ + method + for method in _REQUIRED_REPORTER_METHODS + if not callable(getattr(reporter, method, None)) + ] + if missing: + missing_list = ", ".join(missing) + raise TypeError( + f"Reporter at position {index} ({type(reporter).__name__}) is invalid. " + f"Missing required callable method(s): {missing_list}.", + ) + + @dataclass(frozen=True, slots=True) class _NoOpTelemetryContext: """An immutable and stateless no-op context, optimized for negligible overhead.""" @@ -95,6 +130,16 @@ def count(self, name: str, increment: int = 1, **metadata: Any) -> None: def gauge(self, name: str, value: float, **metadata: Any) -> None: pass + def timed(self, name: str, **metadata: Any) -> Callable[[F], F]: # noqa: ARG002 + """No-op decorator that returns the original function unchanged.""" + return lambda func: func + + def flush(self) -> None: + pass + + def shutdown(self) -> None: + pass + class _EnabledTelemetryContext: """Full-featured telemetry context when enabled.""" @@ -114,13 +159,7 @@ def _create_scope( self, name: str, **metadata: Any ) -> Iterator["_EnabledTelemetryContext"]: """The actual scope implementation when enabled.""" - if not name or not isinstance(name, str): - raise ValueError("Scope name must be a non-empty string") - if _STRICT_SCOPES and not _SCOPE_NAME_PATTERN.fullmatch(name): - raise ValueError( - "Invalid scope name. Use lowercase letters, digits, and underscores; " - "segments separated by dots.", - ) + _validate_scope_name(name, kind="Scope name") scope_stack = _scope_stack_var.get() call_count = _call_count_var.get() @@ -200,6 +239,53 @@ def gauge(self, name: str, value: float, **metadata: Any) -> None: """Record a gauge metric.""" self.metric(name, value, metric_type="gauge", **metadata) + def timed(self, name: str, **metadata: Any) -> Callable[[F], F]: + """Decorator that times function execution under a fixed scope name.""" + scope_name = _validate_scope_name(name, kind="Decorator scope name") + + def decorator(func: F) -> F: + if inspect.iscoroutinefunction(func): + @wraps(func) + async def async_wrapper(*args: Any, **kwargs: Any) -> Any: + with self(scope_name, **metadata): + return await func(*args, **kwargs) + + return cast("F", async_wrapper) + + @wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + with self(scope_name, **metadata): + return func(*args, **kwargs) + + return cast("F", wrapper) + + return decorator + + def _call_reporter_hook(self, hook_name: str) -> None: + """Call an optional lifecycle hook on all reporters.""" + for reporter in self.reporters: + hook = getattr(reporter, hook_name, None) + if hook is None or not callable(hook): + continue + try: + hook() + except Exception as e: + log.error( + "Reporter '%s' failed during %s: %s", + type(reporter).__name__, + hook_name, + e, + exc_info=True, + ) + + def flush(self) -> None: + """Flush reporters that implement optional lifecycle hooks.""" + self._call_reporter_hook("flush") + + def shutdown(self) -> None: + """Shutdown reporters that implement optional lifecycle hooks.""" + self._call_reporter_hook("shutdown") + @property def is_enabled(self) -> bool: return True @@ -221,6 +307,8 @@ def TelemetryContext(*reporters: TelemetryReporter) -> TelemetryContextProtocol: overhead. """ if _NULLSCOPE_ENABLED: + if reporters: + _validate_reporters(reporters) reps = reporters or (SimpleReporter(),) return _EnabledTelemetryContext(*reps) # Always return the same, pre-existing no-op instance. @@ -259,6 +347,12 @@ def reset(self) -> None: self.timings.clear() self.metrics.clear() + def flush(self) -> None: + """No-op lifecycle method for API compatibility.""" + + def shutdown(self) -> None: + """No-op lifecycle method for API compatibility.""" + def as_dict(self) -> dict[str, Any]: """Return a JSON-serializable snapshot of collected data (testing).""" return { From 9b71870c92e9f81255b4b2fcdb72ace93a596303 Mon Sep 17 00:00:00 2001 From: Sean Brar Date: Wed, 4 Feb 2026 22:38:16 -0800 Subject: [PATCH 2/2] perf(core): replace contextmanager with explicit _Scope class Optimize scope timing by using an explicit context manager class instead of @contextmanager decorator. This reduces overhead from generator-based context managers and avoids lambda allocation in no-op decorators. --- src/nullscope/__init__.py | 149 ++++++++++++++++++++++++-------------- 1 file changed, 93 insertions(+), 56 deletions(-) diff --git a/src/nullscope/__init__.py b/src/nullscope/__init__.py index b8b7782..3b05e46 100644 --- a/src/nullscope/__init__.py +++ b/src/nullscope/__init__.py @@ -10,8 +10,8 @@ import re import time from collections import deque -from collections.abc import Callable, Iterator -from contextlib import AbstractContextManager, contextmanager +from collections.abc import Callable +from contextlib import AbstractContextManager from contextvars import ContextVar from dataclasses import dataclass from functools import wraps @@ -48,6 +48,11 @@ F = TypeVar("F", bound=Callable[..., Any]) +def _identity(func: F) -> F: + """Identity function for no-op decorator (avoids lambda allocation).""" + return func + + class _TelemetryMetadata(TypedDict, total=False): depth: int call_count: int @@ -132,7 +137,7 @@ def gauge(self, name: str, value: float, **metadata: Any) -> None: def timed(self, name: str, **metadata: Any) -> Callable[[F], F]: # noqa: ARG002 """No-op decorator that returns the original function unchanged.""" - return lambda func: func + return _identity def flush(self) -> None: pass @@ -141,6 +146,87 @@ def shutdown(self) -> None: pass +class _Scope: + """Explicit context manager for scope timing (replaces @contextmanager for performance).""" + + __slots__ = ( + "_ctx", + "_scope_path", + "_metadata", + "_start_monotonic_s", + "_start_wall_time_s", + "_scope_token", + "_count_token", + ) + + def __init__( + self, ctx: "_EnabledTelemetryContext", name: str, metadata: dict[str, Any] + ) -> None: + _validate_scope_name(name, kind="Scope name") + self._ctx = ctx + self._metadata = metadata + + # Capture current state and compute new stack once + scope_stack = _scope_stack_var.get() + new_stack = (*scope_stack, name) + self._scope_path = ".".join(new_stack) + + # Set up context for entry + self._scope_token = _scope_stack_var.set(new_stack) + self._count_token = _call_count_var.set(_call_count_var.get() + 1) + + # Capture timing at construction (will be finalized in __enter__) + self._start_monotonic_s: float = 0.0 + self._start_wall_time_s: float = 0.0 + + def __enter__(self) -> "_EnabledTelemetryContext": + self._start_monotonic_s = time.perf_counter() + self._start_wall_time_s = time.time() + return self._ctx + + def __exit__( + self, + exc_type: type[BaseException] | None, + exc_val: BaseException | None, + exc_tb: TracebackType | None, + ) -> None: + end_monotonic_s = time.perf_counter() + duration = end_monotonic_s - self._start_monotonic_s + end_wall_time_s = self._start_wall_time_s + duration + + # Reset context vars + _scope_stack_var.reset(self._scope_token) + _call_count_var.reset(self._count_token) + + # Capture final state for metadata + final_stack = _scope_stack_var.get() + final_count = _call_count_var.get() + + # Build metadata (user metadata overrides built-in) + built: _TelemetryMetadata = { + "depth": len(final_stack), + "call_count": final_count, + "parent_scope": ".".join(final_stack) if final_stack else None, + "start_monotonic_s": self._start_monotonic_s, + "end_monotonic_s": end_monotonic_s, + "start_wall_time_s": self._start_wall_time_s, + "end_wall_time_s": end_wall_time_s, + } + enhanced_metadata: dict[str, Any] = {**built, **self._metadata} + + # Report to all reporters + for reporter in self._ctx.reporters: + try: + reporter.record_timing(self._scope_path, duration, **enhanced_metadata) + except Exception as e: + log.error( + "Telemetry reporter '%s' failed: %s", + type(reporter).__name__, + e, + exc_info=True, + ) + + class _EnabledTelemetryContext: """Full-featured telemetry context when enabled.""" @@ -152,65 +238,16 @@ def __init__(self, *reporters: TelemetryReporter): def __call__( self, name: str, **metadata: Any ) -> AbstractContextManager["_EnabledTelemetryContext"]: - return self._create_scope(name, **metadata) - - @contextmanager - def _create_scope( - self, name: str, **metadata: Any - ) -> Iterator["_EnabledTelemetryContext"]: - """The actual scope implementation when enabled.""" - _validate_scope_name(name, kind="Scope name") - - scope_stack = _scope_stack_var.get() - call_count = _call_count_var.get() - scope_path = ".".join((*scope_stack, name)) - start_monotonic_s = time.perf_counter() - start_wall_time_s = time.time() - - scope_token = _scope_stack_var.set((*scope_stack, name)) - count_token = _call_count_var.set(call_count + 1) - - try: - yield self # Return self so chained methods work - finally: - end_monotonic_s = time.perf_counter() - duration = end_monotonic_s - start_monotonic_s - end_wall_time_s = start_wall_time_s + duration - _scope_stack_var.reset(scope_token) - _call_count_var.reset(count_token) - - final_stack = _scope_stack_var.get() - final_count = _call_count_var.get() - - built: _TelemetryMetadata = { - "depth": len(final_stack), - "call_count": final_count, - "parent_scope": ".".join(final_stack) if final_stack else None, - "start_monotonic_s": start_monotonic_s, - "end_monotonic_s": end_monotonic_s, - "start_wall_time_s": start_wall_time_s, - "end_wall_time_s": end_wall_time_s, - } - enhanced_metadata: dict[str, Any] = {**built, **metadata} - - for reporter in self.reporters: - try: - reporter.record_timing(scope_path, duration, **enhanced_metadata) - except Exception as e: - log.error( - "Telemetry reporter '%s' failed: %s", - type(reporter).__name__, - e, - exc_info=True, - ) + return _Scope(self, name, metadata) def metric(self, name: str, value: Any, **metadata: Any) -> None: """Record a metric within current scope context.""" scope_stack = _scope_stack_var.get() - scope_path = ".".join((*scope_stack, name)) + parent_scope = ".".join(scope_stack) if scope_stack else None + scope_path = f"{parent_scope}.{name}" if parent_scope else name built: _TelemetryMetadata = { "depth": len(scope_stack), - "parent_scope": ".".join(scope_stack) if scope_stack else None, + "parent_scope": parent_scope, } enhanced_metadata: dict[str, Any] = {**built, **metadata} for reporter in self.reporters: