Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions drift/instrumentation/aiohttp/e2e-tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
services:
mock-upstream:
image: python:3.12-slim
command: ["python", "/mock/mock_server.py"]
working_dir: /mock
volumes:
- ../../e2e_common/mock_upstream:/mock:ro
environment:
- PYTHONUNBUFFERED=1

app:
build:
context: ../../../..
Expand All @@ -14,7 +23,11 @@ services:
- BENCHMARK_DURATION=${BENCHMARK_DURATION:-10}
- BENCHMARK_WARMUP=${BENCHMARK_WARMUP:-3}
- TUSK_SAMPLING_RATE=${TUSK_SAMPLING_RATE:-}
- USE_MOCK_EXTERNALS=${USE_MOCK_EXTERNALS:-1}
- MOCK_SERVER_BASE_URL=${MOCK_SERVER_BASE_URL:-http://mock-upstream:8081}
working_dir: /app
depends_on:
- mock-upstream
volumes:
# Mount SDK source for hot reload (no rebuild needed for SDK changes)
- ../../../..:/sdk
Expand Down
23 changes: 23 additions & 0 deletions drift/instrumentation/aiohttp/e2e-tests/src/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,10 @@
from flask import Flask, jsonify, request

from drift import TuskDrift
from drift.instrumentation.e2e_common.external_http import (
external_http_timeout_seconds,
upstream_url,
)

# Initialize SDK
sdk = TuskDrift.initialize(
Expand All @@ -14,6 +18,25 @@
)

app = Flask(__name__)
EXTERNAL_HTTP_TIMEOUT_SECONDS = external_http_timeout_seconds()


def _configure_aiohttp_for_mock_and_timeouts():
original_request = aiohttp.ClientSession._request

async def patched_request(self, method, str_or_url, *args, **kwargs):
session_timeout = getattr(self, "_timeout", None)
default_timeout = getattr(aiohttp.client, "DEFAULT_TIMEOUT", None)
using_default_session_timeout = session_timeout is default_timeout or session_timeout == default_timeout
if "timeout" not in kwargs and using_default_session_timeout:
kwargs["timeout"] = aiohttp.ClientTimeout(total=EXTERNAL_HTTP_TIMEOUT_SECONDS)
rewritten = upstream_url(str(str_or_url))
return await original_request(self, method, rewritten, *args, **kwargs)

aiohttp.ClientSession._request = patched_request


_configure_aiohttp_for_mock_and_timeouts()


# =============================================================================
Expand Down
13 changes: 13 additions & 0 deletions drift/instrumentation/django/e2e-tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,13 @@
services:
mock-upstream:
image: python:3.12-slim
command: ["python", "/mock/mock_server.py"]
working_dir: /mock
volumes:
- ../../e2e_common/mock_upstream:/mock:ro
environment:
- PYTHONUNBUFFERED=1

app:
build:
context: ../../../..
Expand All @@ -15,7 +24,11 @@ services:
- BENCHMARK_DURATION=${BENCHMARK_DURATION:-10}
- BENCHMARK_WARMUP=${BENCHMARK_WARMUP:-3}
- TUSK_SAMPLING_RATE=${TUSK_SAMPLING_RATE:-}
- USE_MOCK_EXTERNALS=${USE_MOCK_EXTERNALS:-1}
- MOCK_SERVER_BASE_URL=${MOCK_SERVER_BASE_URL:-http://mock-upstream:8081}
working_dir: /app
depends_on:
- mock-upstream
volumes:
# Mount SDK source for hot reload (no rebuild needed for SDK changes)
- ../../../..:/sdk
Expand Down
129 changes: 96 additions & 33 deletions drift/instrumentation/django/e2e-tests/src/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
from django.views.decorators.http import require_GET, require_http_methods, require_POST
from opentelemetry import context as otel_context

from drift.instrumentation.e2e_common.external_http import (
external_http_timeout_seconds,
upstream_url,
)

EXTERNAL_HTTP_TIMEOUT_SECONDS = external_http_timeout_seconds()


def _run_with_context(ctx, fn, *args, **kwargs):
"""Helper to run a function with OpenTelemetry context in a thread pool."""
Expand All @@ -31,12 +38,13 @@ def get_weather(request):
"""Fetch weather data from external API."""
try:
response = requests.get(
"https://api.open-meteo.com/v1/forecast",
upstream_url("https://api.open-meteo.com/v1/forecast"),
params={
"latitude": 40.7128,
"longitude": -74.0060,
"current_weather": "true",
},
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)
weather = response.json()

Expand All @@ -47,87 +55,142 @@ def get_weather(request):
}
)
except Exception as e:
return JsonResponse({"error": f"Failed to fetch weather: {str(e)}"}, status=500)
return JsonResponse(
{
"location": "New York",
"weather": {},
"fallback": True,
"error": f"Failed to fetch weather: {str(e)}",
}
)


@require_GET
def get_user(request, user_id: str):
"""Fetch user data from external API with seed."""
try:
response = requests.get(f"https://randomuser.me/api/?seed={user_id}")
response = requests.get(
upstream_url("https://randomuser.me/api/"),
params={"seed": user_id},
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)
return JsonResponse(response.json())
except Exception as e:
return JsonResponse({"error": f"Failed to fetch user: {str(e)}"}, status=500)
return JsonResponse({"results": [], "fallback": True, "error": f"Failed to fetch user: {str(e)}"})


@csrf_exempt
@require_POST
def create_post(request):
"""Create a new post via external API."""
data = {}
try:
data = json.loads(request.body)
response = requests.post(
"https://jsonplaceholder.typicode.com/posts",
upstream_url("https://jsonplaceholder.typicode.com/posts"),
json={
"title": data.get("title"),
"body": data.get("body"),
"userId": data.get("userId", 1),
},
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)
return JsonResponse(response.json(), status=201)
except Exception as e:
return JsonResponse({"error": f"Failed to create post: {str(e)}"}, status=500)
return JsonResponse(
{
"id": -1,
"title": data.get("title", ""),
"body": data.get("body", ""),
"userId": data.get("userId", 1),
"fallback": True,
"error": f"Failed to create post: {str(e)}",
},
status=201,
)


@require_GET
def get_post(request, post_id: int):
"""Fetch post and comments in parallel using ThreadPoolExecutor."""
ctx = otel_context.get_current()

with ThreadPoolExecutor(max_workers=2) as executor:
post_future = executor.submit(
_run_with_context,
ctx,
requests.get,
f"https://jsonplaceholder.typicode.com/posts/{post_id}",
try:
ctx = otel_context.get_current()

with ThreadPoolExecutor(max_workers=2) as executor:
post_future = executor.submit(
_run_with_context,
ctx,
requests.get,
upstream_url(f"https://jsonplaceholder.typicode.com/posts/{post_id}"),
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)
comments_future = executor.submit(
_run_with_context,
ctx,
requests.get,
upstream_url(f"https://jsonplaceholder.typicode.com/posts/{post_id}/comments"),
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)

post_response = post_future.result()
comments_response = comments_future.result()

return JsonResponse(
{
"post": post_response.json(),
"comments": comments_response.json(),
}
)
comments_future = executor.submit(
_run_with_context,
ctx,
requests.get,
f"https://jsonplaceholder.typicode.com/posts/{post_id}/comments",
except Exception as e:
return JsonResponse(
{
"post": {},
"comments": [],
"fallback": True,
"error": f"Failed to fetch post: {str(e)}",
}
)

post_response = post_future.result()
comments_response = comments_future.result()

return JsonResponse(
{
"post": post_response.json(),
"comments": comments_response.json(),
}
)


@csrf_exempt
@require_http_methods(["DELETE"])
def delete_post(request, post_id: int):
"""Delete a post via external API."""
try:
requests.delete(f"https://jsonplaceholder.typicode.com/posts/{post_id}")
requests.delete(
upstream_url(f"https://jsonplaceholder.typicode.com/posts/{post_id}"),
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)
return JsonResponse({"message": f"Post {post_id} deleted successfully"})
except Exception as e:
return JsonResponse({"error": f"Failed to delete post: {str(e)}"}, status=500)
return JsonResponse(
{
"message": f"Post {post_id} delete fallback",
"fallback": True,
"error": f"Failed to delete post: {str(e)}",
}
)


@require_GET
def get_activity(request):
"""Fetch a random activity suggestion."""
try:
response = requests.get("https://bored-api.appbrewery.com/random")
response = requests.get(
upstream_url("https://bored-api.appbrewery.com/random"),
timeout=EXTERNAL_HTTP_TIMEOUT_SECONDS,
)
return JsonResponse(response.json())
except Exception as e:
return JsonResponse({"error": f"Failed to fetch activity: {str(e)}"}, status=500)
return JsonResponse(
{
"activity": "Take a short walk",
"type": "relaxation",
"participants": 1,
"fallback": True,
"error": f"Failed to fetch activity: {str(e)}",
}
)


@require_GET
Expand Down
55 changes: 55 additions & 0 deletions drift/instrumentation/e2e_common/external_http.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
"""Shared external HTTP config/helpers for instrumentation e2e apps."""

from __future__ import annotations

import os
from urllib.parse import urlsplit, urlunsplit

_TRUTHY = {"1", "true", "yes", "on"}
_DEFAULT_MOCK_BASE_URL = "http://mock-upstream:8081"
_DEFAULT_TIMEOUT_SECONDS = 3.0


def use_mock_externals() -> bool:
return os.getenv("USE_MOCK_EXTERNALS", "0").strip().lower() in _TRUTHY


def mock_server_base_url() -> str:
return os.getenv("MOCK_SERVER_BASE_URL", _DEFAULT_MOCK_BASE_URL).rstrip("/")


def external_http_timeout_seconds() -> float:
raw = os.getenv("EXTERNAL_HTTP_TIMEOUT_SECONDS")
if raw is None or not raw.strip():
return _DEFAULT_TIMEOUT_SECONDS
try:
return float(raw)
except ValueError:
return _DEFAULT_TIMEOUT_SECONDS


def upstream_url(url: str) -> str:
"""Rewrite external URLs to the local mock upstream when enabled."""
if not use_mock_externals():
return url

source = urlsplit(url)
if not source.scheme or not source.netloc:
return url

target = urlsplit(mock_server_base_url())
path = source.path or "/"
return urlunsplit((target.scheme or "http", target.netloc, path, source.query, ""))


def upstream_url_parts(url: str) -> tuple[str, str, int, str]:
"""Return (scheme, host, port, path_with_query) after mock rewrite."""
rewritten = upstream_url(url)
parsed = urlsplit(rewritten)
scheme = parsed.scheme or "http"
host = parsed.hostname or ""
port = parsed.port or (443 if scheme == "https" else 80)
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"
return scheme, host, port, path
Loading