Skip to content

Commit 3b51ef2

Browse files
harsh543claude
andcommitted
test: improve workflow logging test structure and cleanup
- Add try/finally to restore temporal_extra_mode and full_workflow_info_on_extra - Add clear phase comments: first execution, replay path, replay assertions - Single parameterized test validates: extra mode formatting (dict/flatten), replay suppression, and full_workflow_info_on_extra Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 3e29635 commit 3b51ef2

File tree

1 file changed

+78
-129
lines changed

1 file changed

+78
-129
lines changed

tests/worker/test_workflow.py

Lines changed: 78 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -1991,88 +1991,18 @@ def last_signal(self) -> str:
19911991
return self._last_signal
19921992

19931993

1994-
async def test_workflow_logging(client: Client):
1995-
workflow.logger.full_workflow_info_on_extra = True
1996-
with LogCapturer().logs_captured(
1997-
workflow.logger.base_logger, activity.logger.base_logger
1998-
) as capturer:
1999-
# Log two signals and kill worker before completing. Need to disable
2000-
# workflow cache since we restart the worker and don't want to pay the
2001-
# sticky queue penalty.
2002-
async with new_worker(
2003-
client, LoggingWorkflow, max_cached_workflows=0
2004-
) as worker:
2005-
handle = await client.start_workflow(
2006-
LoggingWorkflow.run,
2007-
id=f"workflow-{uuid.uuid4()}",
2008-
task_queue=worker.task_queue,
2009-
)
2010-
# Send some signals and updates
2011-
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2012-
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
2013-
await handle.execute_update(
2014-
LoggingWorkflow.my_update, "update 1", id="update-1"
2015-
)
2016-
await handle.execute_update(
2017-
LoggingWorkflow.my_update, "update 2", id="update-2"
2018-
)
2019-
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
2020-
2021-
# Confirm logs were produced
2022-
assert capturer.find_log("Signal: signal 1 ({'attempt':")
2023-
assert capturer.find_log("Signal: signal 2")
2024-
assert capturer.find_log("Update: update 1")
2025-
assert capturer.find_log("Update: update 2")
2026-
assert capturer.find_log("Query called")
2027-
assert not capturer.find_log("Signal: signal 3")
2028-
# Also make sure it has some workflow info and correct funcName
2029-
record = capturer.find_log("Signal: signal 1")
2030-
assert (
2031-
record
2032-
and record.__dict__["temporal_workflow"]["workflow_type"]
2033-
== "LoggingWorkflow"
2034-
and record.funcName == "my_signal"
2035-
)
2036-
# Since we enabled full info, make sure it's there
2037-
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
2038-
# Check the log emitted by the update execution.
2039-
record = capturer.find_log("Update: update 1")
2040-
assert (
2041-
record
2042-
and record.__dict__["temporal_workflow"]["update_id"] == "update-1"
2043-
and record.__dict__["temporal_workflow"]["update_name"] == "my_update"
2044-
and "'update_id': 'update-1'" in record.message
2045-
and "'update_name': 'my_update'" in record.message
2046-
)
2047-
2048-
# Clear queue and start a new one with more signals
2049-
capturer.log_queue.queue.clear()
2050-
async with new_worker(
2051-
client,
2052-
LoggingWorkflow,
2053-
task_queue=worker.task_queue,
2054-
max_cached_workflows=0,
2055-
) as worker:
2056-
# Send signals and updates
2057-
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
2058-
await handle.signal(LoggingWorkflow.my_signal, "finish")
2059-
await handle.result()
2060-
2061-
# Confirm replayed logs are not present but new ones are
2062-
assert not capturer.find_log("Signal: signal 1")
2063-
assert not capturer.find_log("Signal: signal 2")
2064-
assert capturer.find_log("Signal: signal 3")
2065-
assert capturer.find_log("Signal: finish")
2066-
2067-
2068-
async def test_workflow_logging_flatten_mode(client: Client):
2069-
"""Test that flatten mode produces OTel-safe scalar attributes."""
2070-
# Save original mode and set to flatten
1994+
@pytest.mark.parametrize("temporal_extra_mode", ["dict", "flatten"])
1995+
async def test_workflow_logging(client: Client, temporal_extra_mode: str):
1996+
"""Test workflow logging: extra mode formatting, replay suppression, and full_workflow_info."""
20711997
original_mode = workflow.logger.temporal_extra_mode
2072-
workflow.logger.temporal_extra_mode = "flatten"
1998+
original_full_info = workflow.logger.full_workflow_info_on_extra
1999+
workflow.logger.temporal_extra_mode = temporal_extra_mode
2000+
workflow.logger.full_workflow_info_on_extra = True
20732001

20742002
try:
20752003
with LogCapturer().logs_captured(workflow.logger.base_logger) as capturer:
2004+
# --- First execution: logs should appear ---
2005+
# Disable workflow cache so worker restart triggers replay
20762006
async with new_worker(
20772007
client, LoggingWorkflow, max_cached_workflows=0
20782008
) as worker:
@@ -2082,41 +2012,75 @@ async def test_workflow_logging_flatten_mode(client: Client):
20822012
task_queue=worker.task_queue,
20832013
)
20842014
await handle.signal(LoggingWorkflow.my_signal, "signal 1")
2015+
await handle.signal(LoggingWorkflow.my_signal, "signal 2")
20852016
await handle.execute_update(
20862017
LoggingWorkflow.my_update, "update 1", id="update-1"
20872018
)
2088-
await handle.signal(LoggingWorkflow.my_signal, "finish")
2089-
await handle.result()
2019+
assert "signal 2" == await handle.query(LoggingWorkflow.last_signal)
20902020

2091-
# Check signal log record
2021+
# Verify logs from first execution
20922022
record = capturer.find_log("Signal: signal 1")
20932023
assert record is not None
2024+
assert record.funcName == "my_signal"
2025+
assert capturer.find_log("Signal: signal 2")
2026+
assert capturer.find_log("Query called")
20942027

2095-
# Should NOT have nested dict
2096-
assert "temporal_workflow" not in record.__dict__
2097-
2098-
# Should have flattened keys with temporal.workflow prefix
2099-
assert record.__dict__["temporal.workflow.workflow_type"] == "LoggingWorkflow"
2100-
assert "temporal.workflow.workflow_id" in record.__dict__
2101-
assert "temporal.workflow.run_id" in record.__dict__
2102-
assert "temporal.workflow.namespace" in record.__dict__
2103-
assert "temporal.workflow.task_queue" in record.__dict__
2104-
assert record.__dict__["temporal.workflow.attempt"] == 1
2105-
2106-
# Verify all temporal.workflow.* values are primitives (OTel-safe)
2107-
for key, value in record.__dict__.items():
2108-
if key.startswith("temporal.workflow."):
2109-
assert isinstance(
2110-
value, (str, int, float, bool, type(None))
2111-
), f"Key {key} has non-primitive value: {type(value)}"
2112-
2113-
# Check update log record has flattened update fields
21142028
update_record = capturer.find_log("Update: update 1")
21152029
assert update_record is not None
2116-
assert update_record.__dict__["temporal.workflow.update_id"] == "update-1"
2117-
assert update_record.__dict__["temporal.workflow.update_name"] == "my_update"
2030+
2031+
# Verify full_workflow_info_on_extra
2032+
assert isinstance(record.__dict__["workflow_info"], workflow.Info)
2033+
2034+
# Verify extra mode formatting
2035+
if temporal_extra_mode == "dict":
2036+
# Dict mode appends context to message and uses nested dict
2037+
assert "({'attempt':" in record.message
2038+
assert record.__dict__["temporal_workflow"]["workflow_type"] == "LoggingWorkflow"
2039+
assert update_record.__dict__["temporal_workflow"]["update_id"] == "update-1"
2040+
assert update_record.__dict__["temporal_workflow"]["update_name"] == "my_update"
2041+
assert "'update_id': 'update-1'" in update_record.message
2042+
else:
2043+
# Flatten mode uses OTel-safe scalar attributes
2044+
assert "temporal_workflow" not in record.__dict__
2045+
assert record.__dict__["temporal.workflow.workflow_type"] == "LoggingWorkflow"
2046+
assert "temporal.workflow.workflow_id" in record.__dict__
2047+
assert "temporal.workflow.run_id" in record.__dict__
2048+
assert "temporal.workflow.namespace" in record.__dict__
2049+
assert "temporal.workflow.task_queue" in record.__dict__
2050+
assert record.__dict__["temporal.workflow.attempt"] == 1
2051+
assert update_record.__dict__["temporal.workflow.update_id"] == "update-1"
2052+
assert update_record.__dict__["temporal.workflow.update_name"] == "my_update"
2053+
2054+
# Verify all temporal.workflow.* values are primitives (OTel-safe)
2055+
for key, value in record.__dict__.items():
2056+
if key.startswith("temporal.workflow."):
2057+
assert isinstance(
2058+
value, (str, int, float, bool, type(None))
2059+
), f"Key {key} has non-primitive value: {type(value)}"
2060+
2061+
# --- Clear logs and continue execution (replay path) ---
2062+
# When the new worker starts, it replays the workflow history (signals 1 & 2).
2063+
# Replay suppression should prevent those logs from appearing again.
2064+
capturer.log_queue.queue.clear()
2065+
2066+
async with new_worker(
2067+
client,
2068+
LoggingWorkflow,
2069+
task_queue=worker.task_queue,
2070+
max_cached_workflows=0,
2071+
) as worker:
2072+
await handle.signal(LoggingWorkflow.my_signal, "signal 3")
2073+
await handle.signal(LoggingWorkflow.my_signal, "finish")
2074+
await handle.result()
2075+
2076+
# --- Replay execution: no duplicate logs ---
2077+
assert not capturer.find_log("Signal: signal 1")
2078+
assert not capturer.find_log("Signal: signal 2")
2079+
assert capturer.find_log("Signal: signal 3")
2080+
assert capturer.find_log("Signal: finish")
21182081
finally:
21192082
workflow.logger.temporal_extra_mode = original_mode
2083+
workflow.logger.full_workflow_info_on_extra = original_full_info
21202084

21212085

21222086
@activity.defn
@@ -2180,6 +2144,18 @@ async def test_workflow_logging_task_fail(client: Client):
21802144
== "task_fail_once_activity"
21812145
)
21822146

2147+
def workflow_failure_with_identifier(l: logging.LogRecord):
2148+
if (
2149+
hasattr(l, "__temporal_error_identifier")
2150+
and getattr(l, "__temporal_error_identifier")
2151+
== "WorkflowTaskFailure"
2152+
):
2153+
assert l.msg.startswith("Failed activation on workflow")
2154+
return True
2155+
return False
2156+
2157+
assert capturer.find(workflow_failure_with_identifier) is not None
2158+
21832159

21842160
@workflow.defn
21852161
class StackTraceWorkflow:
@@ -8039,33 +8015,6 @@ async def test_quick_activity_swallows_cancellation(client: Client):
80398015
assert cause.message == "Workflow cancelled"
80408016

80418017

8042-
async def test_workflow_logging_trace_identifier(client: Client):
8043-
with LogCapturer().logs_captured(
8044-
temporalio.worker._workflow_instance.logger
8045-
) as capturer:
8046-
async with new_worker(
8047-
client,
8048-
TaskFailOnceWorkflow,
8049-
activities=[task_fail_once_activity],
8050-
) as worker:
8051-
await client.execute_workflow(
8052-
TaskFailOnceWorkflow.run,
8053-
id="workflow_failure_trace_identifier",
8054-
task_queue=worker.task_queue,
8055-
)
8056-
8057-
def workflow_failure(l: logging.LogRecord):
8058-
if (
8059-
hasattr(l, "__temporal_error_identifier")
8060-
and getattr(l, "__temporal_error_identifier") == "WorkflowTaskFailure"
8061-
):
8062-
assert l.msg.startswith("Failed activation on workflow")
8063-
return True
8064-
return False
8065-
8066-
assert capturer.find(workflow_failure) is not None
8067-
8068-
80698018
@activity.defn
80708019
def use_in_workflow() -> bool:
80718020
return workflow.in_workflow()

0 commit comments

Comments
 (0)