Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
249 changes: 155 additions & 94 deletions burr/core/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,31 +338,44 @@ def _run_single_step_streaming_action(
result = None
state_update = None
count = 0
for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
count += 1
try:
for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
if state_update is None:
count += 1
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
yield result, None
except Exception as e:
if state_update is None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
yield result, None
raise
logger.warning(
"Streaming action '%s' raised %s after yielding %d items. "
"Proceeding with final state from generator cleanup. Original error: %s",
action.name,
type(e).__name__,
count,
e,
exc_info=True,
)

if state_update is None:
raise ValueError(
Expand Down Expand Up @@ -391,31 +404,45 @@ async def _arun_single_step_streaming_action(
result = None
state_update = None
count = 0
async for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
try:
async for item in generator:
if not isinstance(item, tuple):
# TODO -- consider adding support for just returning a result.
raise ValueError(
f"Action {action.name} must yield a tuple of (result, state_update). "
f"For all non-final results (intermediate),"
f"the state update must be None"
)
result, state_update = item
if state_update is None:
count += 1
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
yield result, None
except Exception as e:
if state_update is None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield result, None
raise
logger.warning(
"Streaming action '%s' raised %s after yielding %d items. "
"Proceeding with final state from generator cleanup. Original error: %s",
action.name,
type(e).__name__,
count,
e,
exc_info=True,
)

if state_update is None:
raise ValueError(
f"Action {action.name} did not return a state update. For async actions, the last yield "
Expand Down Expand Up @@ -450,28 +477,42 @@ def _run_multi_step_streaming_action(
result = None
first_stream_start_time = None
count = 0
for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=next_result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
try:
for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
lifecycle_adapters.call_all_lifecycle_hooks_sync(
"post_stream_item",
item=next_result,
item_index=count,
stream_initialize_time=stream_initialize_time,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
except Exception as e:
if result is None:
raise
logger.warning(
"Streaming action '%s' raised %s after yielding %d items. "
"Proceeding with last yielded result for reducer. "
"Note: the reducer will run on potentially partial data. Original error: %s",
action.name,
type(e).__name__,
count,
e,
exc_info=True,
)
state_update = _run_reducer(action, state, result, action.name)
_validate_result(result, action.name, action.schema)
_validate_reducer_writes(action, state_update, action.name)
Expand All @@ -494,28 +535,42 @@ async def _arun_multi_step_streaming_action(
result = None
first_stream_start_time = None
count = 0
async for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=next_result,
stream_initialize_time=stream_initialize_time,
item_index=count,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
try:
async for item in generator:
# We want to peek ahead so we can return the last one
# This is slightly eager, but only in the case in which we
# are using a multi-step streaming action
next_result = result
result = item
if next_result is not None:
if first_stream_start_time is None:
first_stream_start_time = system.now()
await lifecycle_adapters.call_all_lifecycle_hooks_sync_and_async(
"post_stream_item",
item=next_result,
stream_initialize_time=stream_initialize_time,
item_index=count,
first_stream_item_start_time=first_stream_start_time,
action=action.name,
app_id=app_id,
partition_key=partition_key,
sequence_id=sequence_id,
)
count += 1
yield next_result, None
except Exception as e:
if result is None:
raise
logger.warning(
"Streaming action '%s' raised %s after yielding %d items. "
"Proceeding with last yielded result for reducer. "
"Note: the reducer will run on potentially partial data. Original error: %s",
action.name,
type(e).__name__,
count,
e,
exc_info=True,
)
state_update = _run_reducer(action, state, result, action.name)
_validate_result(result, action.name, action.schema)
_validate_reducer_writes(action, state_update, action.name)
Expand Down Expand Up @@ -1871,7 +1926,9 @@ def stream_iterate(
halt_before: Optional[Union[str, List[str]]] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> Generator[
Tuple[Action, StreamingResultContainer[ApplicationStateType, Union[dict, Any]]], None, None
Tuple[Action, StreamingResultContainer[ApplicationStateType, Union[dict, Any]]],
None,
None,
]:
"""Produces an iterator that iterates through intermediate streams. You may want
to use this in something like deep research mode in which:
Expand Down Expand Up @@ -1915,7 +1972,11 @@ async def astream_iterate(
halt_before: Optional[Union[str, List[str]]] = None,
inputs: Optional[Dict[str, Any]] = None,
) -> AsyncGenerator[
Tuple[Action, AsyncStreamingResultContainer[ApplicationStateType, Union[dict, Any]]], None
Tuple[
Action,
AsyncStreamingResultContainer[ApplicationStateType, Union[dict, Any]],
],
None,
]:
"""Async version of stream_iterate. Produces an async generator that iterates
through intermediate streams. See stream_iterate for more details.
Expand Down
Loading
Loading