Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
[pytest]
addopts = -n auto
asyncio_mode = auto
172 changes: 170 additions & 2 deletions src/humanloop/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@

from humanloop.base_client import AsyncBaseHumanloop, BaseHumanloop
from humanloop.core.client_wrapper import SyncClientWrapper
from humanloop.decorators.flow import flow as flow_decorator_factory
from humanloop.decorators.prompt import prompt_decorator_factory
from humanloop.decorators.flow import a_flow_decorator_factory as a_flow_decorator_factory
from humanloop.decorators.flow import flow_decorator_factory as flow_decorator_factory
from humanloop.decorators.prompt import a_prompt_decorator_factory, prompt_decorator_factory
from humanloop.decorators.tool import a_tool_decorator_factory as a_tool_decorator_factory
from humanloop.decorators.tool import tool_decorator_factory as tool_decorator_factory
from humanloop.environment import HumanloopEnvironment
from humanloop.evals import run_eval
Expand Down Expand Up @@ -273,6 +275,50 @@ def call_llm(messages):
"""
return prompt_decorator_factory(path=path)

def a_prompt(
self,
*,
path: str,
):
"""Auto-instrument LLM providers and create [Prompt](https://humanloop.com/docs/explanation/prompts)
Logs on Humanloop from them, for async functions.

```python
@a_prompt(path="My Async Prompt")
async def call_llm_async(messages):
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = await client.chat.completions.create(
model="gpt-4o",
temperature=0.8,
frequency_penalty=0.5,
max_tokens=200,
messages=messages,
)
return response.choices[0].message.content

Calling the function above creates a new Log on Humanloop
against this Prompt version:
{
provider: "openai",
model: "gpt-4o",
endpoint: "chat",
max_tokens: 200,
temperature: 0.8,
frequency_penalty: 0.5,
}
```

If a different model, endpoint, or hyperparameter is used, a new
Prompt version is created.

:param path: The path where the Prompt is created. If not
provided, the function name is used as the path and the File
is created in the root of your Humanloop organization workspace.

:param prompt_kernel: Attributes that define the Prompt. See `class:DecoratorPromptKernelRequestParams`
"""
return a_prompt_decorator_factory(path=path)

def tool(
self,
*,
Expand Down Expand Up @@ -331,6 +377,64 @@ def calculator(a: int, b: Optional[int]) -> int:
setup_values=setup_values,
)

def a_tool(
self,
*,
path: str,
attributes: Optional[dict[str, Any]] = None,
setup_values: Optional[dict[str, Any]] = None,
):
"""Manage async [Tool](https://humanloop.com/docs/explanation/tools) Files through code.

The decorator inspects the wrapped async function's source code to infer the Tool's
JSON Schema. If the function declaration changes, a new Tool version
is upserted with an updated JSON Schema.

For example:

```python
# Adding @a_tool on this function
@humanloop_client.a_tool(path="async_calculator")
async def async_calculator(a: int, b: Optional[int]) -> int:
\"\"\"Add two numbers together asynchronously.\"\"\"
return a + b

# Creates a Tool with this JSON Schema:
{
strict: True,
function: {
"name": "async_calculator",
"description": "Add two numbers together asynchronously.",
"parameters": {
type: "object",
properties: {
a: {type: "integer"},
b: {type: "integer"}
},
required: ["a"],
},
}
}
```

The return value of the decorated function must be JSON serializable.

If the function raises an exception, the created Log will have `output`
set to null, and the `error` field populated.

:param path: The path of the File in the Humanloop workspace.

:param setup_values: Values needed to setup the Tool, defined in [JSON Schema](https://json-schema.org/)

:param attributes: Additional fields to describe the Tool. Helpful to separate Tool versions from each other with details on how they were created or used.
"""
return a_tool_decorator_factory(
opentelemetry_tracer=self._opentelemetry_tracer,
path=path,
attributes=attributes,
setup_values=setup_values,
)

def flow(
self,
*,
Expand Down Expand Up @@ -394,6 +498,70 @@ def agent():
attributes=attributes,
)

def a_flow(
self,
*,
path: str,
attributes: Optional[dict[str, Any]] = None,
):
"""Trace SDK logging calls through [Flows](https://humanloop.com/docs/explanation/flows) for async functions.

Use it as the entrypoint of your async LLM feature. Logging calls like `prompts.call(...)`,
`tools.call(...)`, or other Humanloop decorators will be automatically added to the trace.

For example:

```python
@a_prompt(template="You are an assistant on the following topics: {{topics}}.")
async def call_llm_async(messages):
client = AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
response = await client.chat.completions.create(
model="gpt-4o",
temperature=0.8,
frequency_penalty=0.5,
max_tokens=200,
messages=messages,
)
return response.choices[0].message.content

@a_flow(attributes={"version": "v1"})
async def async_agent():
while True:
messages = []
user_input = input("You: ")
if user_input == "exit":
break
messages.append({"role": "user", "content": user_input})
response = await call_llm_async(messages)
messages.append({"role": "assistant", "content": response})
print(f"Assistant: {response}")
```

Each call to async_agent will create a trace corresponding to the conversation
session. Multiple Prompt Logs will be created as the LLM is called. They
will be added to the trace, allowing you to see the whole conversation
in the UI.

If the function returns a ChatMessage-like object, the Log will
populate the `output_message` field. Otherwise, it will serialize
the return value and populate the `output` field.

If an exception is raised, the output fields will be set to None
and the error message will be set in the Log's `error` field.

:param path: The path to the Flow. If not provided, the function name
will be used as the path and the File will be created in the root
of your organization workspace.

:param attributes: Additional fields to describe the Flow. Helpful to separate Flow versions from each other with details on how they were created or used.
"""
return a_flow_decorator_factory(
client=self,
opentelemetry_tracer=self._opentelemetry_tracer,
path=path,
attributes=attributes,
)

def pull(self, path: Optional[str] = None, environment: Optional[str] = None) -> Tuple[List[str], List[str]]:
"""Pull Prompt and Agent files from Humanloop to local filesystem.

Expand Down
5 changes: 3 additions & 2 deletions src/humanloop/context.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import threading
from contextlib import contextmanager
from dataclasses import dataclass
import threading
from typing import Any, Callable, Generator, Literal, Optional

from opentelemetry import context as context_api

from humanloop.error import HumanloopRuntimeError
from humanloop.otel.constants import (
HUMANLOOP_CONTEXT_EVALUATION,
HUMANLOOP_CONTEXT_DECORATOR,
HUMANLOOP_CONTEXT_EVALUATION,
HUMANLOOP_CONTEXT_TRACE_ID,
)

Expand Down
114 changes: 107 additions & 7 deletions src/humanloop/decorators/flow.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import logging
from functools import wraps
from typing import Any, Callable, Optional, TypeVar
from typing_extensions import ParamSpec
from typing import Any, Awaitable, Callable, Optional, TypeVar

from opentelemetry.trace import Span, Tracer
from typing_extensions import ParamSpec

from humanloop.base_client import BaseHumanloop
from humanloop.context import (
Expand All @@ -12,18 +12,18 @@
set_decorator_context,
set_trace_id,
)
from humanloop.evals.run import HumanloopRuntimeError
from humanloop.types.chat_message import ChatMessage
from humanloop.decorators.helpers import bind_args
from humanloop.evals.run import HumanloopRuntimeError
from humanloop.evals.types import FileEvalConfig
from humanloop.otel.constants import (
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_LOG_KEY,
HUMANLOOP_FILE_PATH_KEY,
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_SPAN_NAME,
HUMANLOOP_LOG_KEY,
)
from humanloop.otel.helpers import process_output, write_to_opentelemetry_span
from humanloop.requests import FlowKernelRequestParams as FlowDict
from humanloop.types.chat_message import ChatMessage
from humanloop.types.flow_log_response import FlowLogResponse

logger = logging.getLogger("humanloop.sdk")
Expand All @@ -33,7 +33,7 @@
R = TypeVar("R")


def flow(
def flow_decorator_factory(
client: "BaseHumanloop",
opentelemetry_tracer: Tracer,
path: str,
Expand Down Expand Up @@ -131,3 +131,103 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
return wrapper

return decorator


def a_flow_decorator_factory(
client: "BaseHumanloop",
opentelemetry_tracer: Tracer,
path: str,
attributes: Optional[dict[str, Any]] = None,
):
flow_kernel = {"attributes": attributes or {}}

def decorator(func: Callable[P, Awaitable[R]]) -> Callable[P, Awaitable[Optional[R]]]:
decorator_path = path or func.__name__
file_type = "flow"

@wraps(func)
async def wrapper(*args: P.args, **kwargs: P.kwargs) -> Optional[R]:
span: Span
with set_decorator_context(
DecoratorContext(
path=decorator_path,
type="flow",
version=flow_kernel,
)
) as decorator_context:
with opentelemetry_tracer.start_as_current_span(HUMANLOOP_FLOW_SPAN_NAME) as span: # type: ignore
span.set_attribute(HUMANLOOP_FILE_PATH_KEY, decorator_path)
span.set_attribute(HUMANLOOP_FILE_TYPE_KEY, file_type)
trace_id = get_trace_id()
func_args = bind_args(func, args, kwargs)

# Create the trace ahead so we have a parent ID to reference
init_log_inputs = {
"inputs": {k: v for k, v in func_args.items() if k != "messages"},
"messages": func_args.get("messages"),
"trace_parent_id": trace_id,
}
this_flow_log: FlowLogResponse = client.flows._log( # type: ignore [attr-defined]
path=decorator_context.path,
flow=decorator_context.version,
log_status="incomplete",
**init_log_inputs,
)

with set_trace_id(this_flow_log.id):
func_output: Optional[R]
log_output: Optional[str]
log_error: Optional[str]
log_output_message: Optional[ChatMessage]
try:
func_output = await func(*args, **kwargs)
if (
isinstance(func_output, dict)
and len(func_output.keys()) == 2
and "role" in func_output
and "content" in func_output
):
log_output_message = func_output # type: ignore [assignment]
log_output = None
else:
log_output = process_output(func=func, output=func_output)
log_output_message = None
log_error = None
except HumanloopRuntimeError as e:
# Critical error, re-raise
client.logs.delete(id=this_flow_log.id)
span.record_exception(e)
raise e
except Exception as e:
logger.error(f"Error calling {func.__name__}: {e}")
log_output = None
log_output_message = None
log_error = str(e)
func_output = None

updated_flow_log = {
"log_status": "complete",
"output": log_output,
"error": log_error,
"output_message": log_output_message,
"id": this_flow_log.id,
}
# Write the Flow Log to the Span on HL_LOG_OT_KEY
write_to_opentelemetry_span(
span=span, # type: ignore [arg-type]
key=HUMANLOOP_LOG_KEY,
value=updated_flow_log, # type: ignore
)
# Return the output of the decorated function
return func_output # type: ignore [return-value]

wrapper.file = FileEvalConfig( # type: ignore
path=decorator_path,
type=file_type, # type: ignore [arg-type, typeddict-item]
version=FlowDict(**flow_kernel), # type: ignore
callable=wrapper,
)

return wrapper

return decorator
Loading