Skip to content

Python: (core): Add functional workflow API#4238

Open
moonbox3 wants to merge 6 commits intomicrosoft:mainfrom
moonbox3:functional-workflow-api
Open

Python: (core): Add functional workflow API#4238
moonbox3 wants to merge 6 commits intomicrosoft:mainfrom
moonbox3:functional-workflow-api

Conversation

@moonbox3
Copy link
Contributor

Motivation and Context

The functional API is a stepping stone between single-agent use and the full graph API. Users write workflows as plain async functions -- no executor classes, no edges, no builder patterns.

  • Add @workflow and @step decorators for writing workflows as plain async functions
  • Native Python control flow (if/else, loops, asyncio.gather) replaces graph concepts
  • @step is opt-in: plain functions work inside @workflow without it. Use @step on expensive operations (agent calls, API requests) to save their results and skip re-execution on
    HITL resume or crash recovery
  • Streaming support via run(stream=True)
  • HITL support via ctx.request_info() with replay
  • .as_agent() wraps a functional workflow as an agent-compatible object

A very basic example of the functional workflow API:

@workflow
async def pipeline(data: str) -> str:
    upper = await to_upper(data)
    return await reverse(upper)

result = await pipeline.run("hello")
print(result.get_outputs())  # ['OLLEH']

Note: @step is opt-in for functions where per-step checkpointing matters (for example, agent calls). Without @step, workflows still support HITL and checkpointing — functions just re-execute on resume.

@step
async def call_agent(prompt: str) -> str:
    return (await agent.run(prompt)).text

@workflow
async def pipeline(data: str, ctx: RunContext) -> str:
    result = await call_agent(data)        # saved by @step
    validated = await validate(result)      # plain function, re-runs on resume
    feedback = await ctx.request_info(...)  # HITL pause
    return await finalize(result, feedback)

ctx: RunContext is only needed when you use HITL (request_info), custom events (add_event), or state (get_state/set_state). Otherwise, omit it for a cleaner signature.

Description

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

Copilot AI review requested due to automatic review settings February 25, 2026 08:40
@markwallace-microsoft markwallace-microsoft added documentation Improvements or additions to documentation python labels Feb 25, 2026
@markwallace-microsoft
Copy link
Member

markwallace-microsoft commented Feb 25, 2026

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/core/agent_framework/_workflows
   _functional.py3431495%688, 713, 716–717, 725–726, 786–787, 793, 1073–1074, 1076, 1096, 1098
TOTAL22151277887% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
4624 244 💤 0 ❌ 0 🔥 1m 18s ⏱️

Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a functional workflow API as an alternative to the existing graph-based workflow API. The functional approach allows users to write workflows as plain async functions decorated with @workflow, using native Python control flow (if/else, loops, asyncio.gather) instead of explicit graph construction with executors and edges. The @step decorator is optional and provides per-step checkpointing, caching, and observability.

Changes:

  • Added core implementation (_functional.py) with @workflow, @step decorators, RunContext, FunctionalWorkflow, and FunctionalWorkflowAgent classes
  • Added comprehensive test suite (40+ test cases covering basic execution, HITL, checkpointing, streaming, error handling, edge cases)
  • Added 6 sample files demonstrating functional workflows (basic pipeline, streaming, parallel execution, checkpointing, HITL, agent integration)
  • Restructured getting-started samples to introduce functional workflows before graph workflows
  • Updated exports in __init__.py to expose new functional API symbols

Reviewed changes

Copilot reviewed 13 out of 14 changed files in this pull request and generated 2 comments.

Show a summary per file
File Description
python/packages/core/agent_framework/_workflows/_functional.py Core implementation of functional workflow API with RunContext, StepWrapper, FunctionalWorkflow, and FunctionalWorkflowAgent classes (1105 lines)
python/packages/core/agent_framework/__init__.py Added exports for FunctionalWorkflow, FunctionalWorkflowAgent, RunContext, StepWrapper, step, and workflow
python/packages/core/tests/workflow/test_functional_workflow.py Comprehensive test suite covering basic execution, events, parallelism, HITL, errors, streaming, state, checkpointing, control flow, and edge cases (1031 lines)
python/samples/01-get-started/05_first_functional_workflow.py Getting started sample demonstrating basic functional workflow with plain async functions
python/samples/01-get-started/06_first_graph_workflow.py Renamed and updated graph workflow sample (previously 05_first_workflow.py)
python/samples/01-get-started/07_host_your_agent.py Renamed agent hosting sample (previously 06_host_your_agent.py)
python/samples/01-get-started/README.md Updated sample listing to include both functional and graph workflow samples
python/samples/03-workflows/functional/basic_pipeline.py Sample showing simplest sequential pipeline with @workflow decorator
python/samples/03-workflows/functional/basic_streaming_pipeline.py Sample demonstrating streaming workflow events with run(stream=True)
python/samples/03-workflows/functional/parallel_pipeline.py Sample showing fan-out/fan-in with asyncio.gather
python/samples/03-workflows/functional/steps_and_checkpointing.py Sample explaining @step decorator for per-step checkpointing and observability
python/samples/03-workflows/functional/hitl_review.py Sample demonstrating HITL with ctx.request_info() and resume
python/samples/03-workflows/functional/agent_integration.py Sample showing agent calls inside workflows and .as_agent() wrapper
python/samples/03-workflows/README.md Added functional workflow section to samples overview

@workflow
async def data_pipeline(url: str) -> str:
"""A simple sequential data pipeline."""
raw = await fetch_data(url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be usefull to also demonstrate that because this is just a function, you do not have to wrap everything in steps, you can do some of the manipulation just as simple code between steps, making it a lot simpler

return f"Draft document about '{topic}': Lorem ipsum dolor sit amet..."


@step
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how this does step compare to handler is there a lot of overlap and could we reuse steps in graphs, or handler here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want them to be conceptually different things. A @handler handles a message routed to it by the graph: it's reactive, tied to the executor contract. A @step marks a function call in a sequential flow: it's proactive, just "I called this function as step N." Different mental models, different names.

print(f"State: {result1.get_final_state()}")
assert result1.get_final_state() == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS

requests = result1.get_request_info_events()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what would happen here if state != WorkflowRunState.IDLE_WITH_PENDING_REQUESTS?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If request_info() was never reached (an early return), the workflow completes normally with state IDLE and get_request_info_events() returns an empty list. Added a comment in the sample clarifying this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so requests would be None or would the get... call raise?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_request_info_events() returns an empty list. It's a filter over the event stream, so no requests means []. No exception raised.

``asyncio.gather``) instead of a graph-based topology.

A ``@workflow``-decorated async function receives its input as the first
positional argument. If the function needs HITL (``request_info``), custom
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should it be:

Suggested change
positional argument. If the function needs HITL (``request_info``), custom
positional argument. If a step needs HITL (``request_info``), custom

?

A ``@workflow``-decorated async function receives its input as the first
positional argument. If the function needs HITL (``request_info``), custom
events, or key/value state, add a :class:`RunContext` parameter — otherwise it
can be omitted. Inside the function, plain ``async`` calls run normally.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be:

Suggested change
can be omitted. Inside the function, plain ``async`` calls run normally.
can be omitted. Inside the workflow, plain ``async`` calls run normally.

?

class RunContext:
"""Execution context injected into ``@workflow`` functions.

Every ``@workflow`` invocation receives a ``RunContext`` instance that
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a bit confused on which entity receives the context. Is it the workflow or the steps or both?

Comment on lines +114 to +117
@workflow
async def hitl_pipeline(data: str, ctx: RunContext) -> str:
feedback = await ctx.request_info({"draft": data}, response_type=str)
return feedback
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My brain maps @workflow to the graph-based Workflow and @step to Executor. I can see the benefit of allowing request_info at the workflow level. It's kind of like an executor whose sole purpose is to get user feedback. But should we also allow request_info inside a @step?


On first execution this suspends the workflow by raising an internal
``WorkflowInterrupted`` signal (caught by the framework, never exposed
to user code). The caller receives a ``WorkflowRunResult`` whose
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The caller receives a WorkflowRunResult

What about streaming?

def __init__(self, func: Callable[..., Awaitable[R]], *, name: str | None = None) -> None:
if not inspect.iscoroutinefunction(func):
raise TypeError(
f"@step can only decorate async functions, but '{func.__name__}' is not a coroutine function."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably not super important but should we also support not async methods?

functools.update_wrapper(self, func) # type: ignore[arg-type]

# ------------------------------------------------------------------
# run() — same overloaded interface as graph Workflow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we extract these methods and make both workflow types get them for free?

self._last_step_cache = dict(ctx._step_cache)

# Yield collected events
for event in ctx._get_events():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Is this true streaming?

It looks like all events have been produced at this point.

cache_key = ctx._get_step_cache_key(self.name)
found, cached = ctx._get_cached_result(cache_key)
invocation_data = deepcopy({"args": args, "kwargs": kwargs}) if args or kwargs else None
if found:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check if the input arguments have the same values?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There could be scenarios where some steps are checkpointed and some are not (the ones without the decorator). If a checkpointed step depends on the output of a non-checkpointed step and its output changes, it may lead to incorrect results.



# Plain async functions — no decorators needed
async def to_upper_case(text: str) -> str:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this sample will become even simpler if we can remove the asyncs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

documentation Improvements or additions to documentation python

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Python: Add functional workflow API

5 participants