Skip to content

Commit 42a7cb9

Browse files
[pre-commit.ci] auto fixes from pre-commit.com hooks
for more information, see https://pre-commit.ci
1 parent f7cc658 commit 42a7cb9

File tree

3 files changed

+50
-41
lines changed

3 files changed

+50
-41
lines changed

src/workflows/recipe/__init__.py

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33
import functools
44
import logging
55
from collections.abc import Callable
6-
from opentelemetry import trace
76
from typing import Any
87

8+
from opentelemetry import trace
9+
910
from workflows.recipe.recipe import Recipe
1011
from workflows.recipe.validate import validate_recipe
1112
from workflows.recipe.wrapper import RecipeWrapper
@@ -82,27 +83,27 @@ def unwrap_recipe(header, message):
8283
environment = message.get("environment", {})
8384
if isinstance(environment, dict):
8485
recipe_id = environment.get("ID")
85-
86+
8687
# Try multiple locations where DCID might be stored
8788
top_level_params = {}
8889
if isinstance(message, dict):
8990
# Direct parameters (top-level or in recipe)
9091
top_level_params = message.get("parameters", {})
91-
92+
9293
# Payload parameters (most common location)
9394
payload = message.get("payload", {})
9495
payload_params = {}
9596
if isinstance(payload, dict):
9697
payload_params = payload.get("parameters", {})
97-
98+
9899
# Try all common locations
99100
dcid = (
100-
top_level_params.get("ispyb_dcid") or
101-
top_level_params.get("dcid") or
102-
payload_params.get("ispyb_dcid") or
103-
payload_params.get("dcid") or
104-
payload.get("ispyb_dcid") or
105-
payload.get("dcid")
101+
top_level_params.get("ispyb_dcid")
102+
or top_level_params.get("dcid")
103+
or payload_params.get("ispyb_dcid")
104+
or payload_params.get("dcid")
105+
or payload.get("ispyb_dcid")
106+
or payload.get("dcid")
106107
)
107108

108109
if dcid:
@@ -111,13 +112,15 @@ def unwrap_recipe(header, message):
111112

112113
if recipe_id:
113114
span.set_attribute("recipe_id", recipe_id)
114-
span.add_event("recipe.id_extracted", attributes={"recipe_id": recipe_id})
115+
span.add_event(
116+
"recipe.id_extracted", attributes={"recipe_id": recipe_id}
117+
)
115118

116119
# Extract span_id and trace_id for logging
117120
span_context = span.get_span_context()
118121
if span_context and span_context.is_valid:
119-
span_id = format(span_context.span_id, '016x')
120-
trace_id = format(span_context.trace_id, '032x')
122+
span_id = format(span_context.span_id, "016x")
123+
trace_id = format(span_context.trace_id, "032x")
121124

122125
log_extra = {
123126
"span_id": span_id,
@@ -128,11 +131,8 @@ def unwrap_recipe(header, message):
128131
if recipe_id:
129132
log_extra["recipe_id"] = recipe_id
130133

131-
logger.info(
132-
"Processing recipe message",
133-
extra=log_extra
134-
)
135-
134+
logger.info("Processing recipe message", extra=log_extra)
135+
136136
if log_extender and rw.environment and rw.environment.get("ID"):
137137
with log_extender("recipe_ID", rw.environment["ID"]):
138138
return callback(rw, header, message.get("payload"))

src/workflows/services/common_service.py

Lines changed: 19 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@
99
import time
1010
from typing import Any
1111

12-
import workflows
13-
import workflows.logging
14-
1512
from opentelemetry import trace
13+
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
14+
from opentelemetry.sdk.resources import SERVICE_NAME, Resource
1615
from opentelemetry.sdk.trace import TracerProvider
1716
from opentelemetry.sdk.trace.export import BatchSpanProcessor
18-
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
17+
18+
import workflows
19+
import workflows.logging
1920
from workflows.transport.middleware.otel_tracing import OTELTracingMiddleware
20-
from opentelemetry.sdk.resources import Resource, SERVICE_NAME
2121

2222

2323
class Status(enum.Enum):
@@ -192,30 +192,36 @@ def start_transport(self):
192192
self.transport.subscription_callback_set_intercept(
193193
self._transport_interceptor
194194
)
195-
195+
196196
# Configure OTELTracing
197-
resource = Resource.create({
198-
SERVICE_NAME: self._service_name,
199-
})
197+
resource = Resource.create(
198+
{
199+
SERVICE_NAME: self._service_name,
200+
}
201+
)
200202

201203
self.log.debug("Configuring OTELTracing")
202204
provider = TracerProvider(resource=resource)
203205
trace.set_tracer_provider(provider)
204206

205207
# Configure BatchProcessor and OTLPSpanExporter to point to OTELCollector
206208
otlp_exporter = OTLPSpanExporter(
207-
endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces",
208-
timeout=10
209+
endpoint="https://otel.tracing.diamond.ac.uk:4318/v1/traces", timeout=10
209210
)
210211
span_processor = BatchSpanProcessor(otlp_exporter)
211212
provider.add_span_processor(span_processor)
212213

213214
# Add OTELTracingMiddleware to the transport layer
214215
tracer = trace.get_tracer(__name__)
215-
otel_middleware = OTELTracingMiddleware(tracer, service_name=self._service_name)
216+
otel_middleware = OTELTracingMiddleware(
217+
tracer, service_name=self._service_name
218+
)
216219
self._transport.add_middleware(otel_middleware)
217220

218-
self.log.debug("OTELTracingMiddleware added to transport layer of %s", self._service_name)
221+
self.log.debug(
222+
"OTELTracingMiddleware added to transport layer of %s",
223+
self._service_name,
224+
)
219225

220226
metrics = self._environment.get("metrics")
221227
if metrics:
Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
1+
from __future__ import annotations
2+
3+
import functools
4+
from collections.abc import Callable
5+
16
from opentelemetry import trace
7+
from opentelemetry.propagate import extract
8+
29
from workflows.transport.middleware import BaseTransportMiddleware
3-
from collections.abc import Callable
4-
import functools
5-
from opentelemetry.propagate import inject, extract
10+
611

712
class OTELTracingMiddleware(BaseTransportMiddleware):
813
def __init__(self, tracer: trace.Tracer, service_name: str):
@@ -19,18 +24,16 @@ def subscribe(self, call_next: Callable, channel, callback, **kwargs) -> int:
1924
def wrapped_callback(header, message):
2025
# Extract trace context from message headers
2126
ctx = extract(header) if header else None
22-
27+
2328
# Start a new span with the extracted context
2429
with self.tracer.start_as_current_span(
25-
"transport.subscribe",
26-
context=ctx
30+
"transport.subscribe", context=ctx
2731
) as span:
2832
span.set_attribute("service_name", self.service_name)
2933
span.set_attribute("channel", channel)
30-
31-
34+
3235
# Call the original callback
3336
return callback(header, message)
34-
37+
3538
# Call the next middleware with the wrapped callback
36-
return call_next(channel, wrapped_callback, **kwargs)
39+
return call_next(channel, wrapped_callback, **kwargs)

0 commit comments

Comments
 (0)