Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"boto3>=1.40.52",
"botocore>=1.40.52",
"pydantic>=2.0.0,<2.41.3",
"pyyaml>=6.0",
"urllib3>=1.26.0",
"starlette>=0.46.2",
"typing-extensions>=4.13.2,<5.0.0",
Expand Down
4 changes: 3 additions & 1 deletion src/bedrock_agentcore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
"""BedrockAgentCore Runtime SDK - A Python SDK for building and deploying AI agents."""

from .project import Project
from .runtime import BedrockAgentCoreApp, BedrockAgentCoreContext, RequestContext
from .runtime.models import PingStatus

__all__ = [
"BedrockAgentCoreApp",
"PingStatus",
"Project",
"RequestContext",
"BedrockAgentCoreContext",
"PingStatus",
]
10 changes: 9 additions & 1 deletion src/bedrock_agentcore/memory/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@

from .client import MemoryClient
from .controlplane import MemoryControlPlaneClient
from .memory import Memory
from .session import Actor, MemorySession, MemorySessionManager

__all__ = ["Actor", "MemoryClient", "MemorySession", "MemorySessionManager", "MemoryControlPlaneClient"]
__all__ = [
"Actor",
"Memory",
"MemoryClient",
"MemoryControlPlaneClient",
"MemorySession",
"MemorySessionManager",
]
234 changes: 234 additions & 0 deletions src/bedrock_agentcore/memory/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ class MemoryClient:
"get_event",
"delete_event",
"list_events",
"list_actors",
"list_sessions",
}

# AgentCore Memory control plane methods
Expand Down Expand Up @@ -779,6 +781,87 @@ def my_llm(user_input: str, memories: List[Dict]) -> str:
logger.info("Completed full conversation turn with LLM")
return retrieved_memories, agent_response, event

def list_actors(self, memory_id: str, max_results: int = 100) -> List[Dict[str, Any]]:
"""List all actors who have events in a memory.

Args:
memory_id: The memory resource ID
max_results: Maximum number of actors to return (default: 100)

Returns:
List of actor summary dictionaries

Example:
actors = client.list_actors(memory_id="mem-123")
for actor in actors:
print(f"Actor: {actor['actorId']}")
"""
logger.debug("Listing actors for memory: %s", memory_id)
try:
actors: List[Dict[str, Any]] = []
next_token = None

while len(actors) < max_results:
params: Dict[str, Any] = {"memoryId": memory_id}
if next_token:
params["nextToken"] = next_token

response = self.gmdp_client.list_actors(**params)
batch = response.get("actorSummaries", [])
actors.extend(batch)

next_token = response.get("nextToken")
if not next_token or len(actors) >= max_results:
break

logger.debug("Found %d actors", len(actors))
return actors[:max_results]

except ClientError as e:
logger.error("Failed to list actors: %s", e)
raise

def list_sessions(self, memory_id: str, actor_id: str, max_results: int = 100) -> List[Dict[str, Any]]:
"""List all sessions for an actor.

Args:
memory_id: The memory resource ID
actor_id: The actor ID
max_results: Maximum number of sessions to return (default: 100)

Returns:
List of session summary dictionaries

Example:
sessions = client.list_sessions(memory_id="mem-123", actor_id="user-456")
for session in sessions:
print(f"Session: {session['sessionId']}")
"""
logger.debug("Listing sessions for actor: %s in memory: %s", actor_id, memory_id)
try:
sessions: List[Dict[str, Any]] = []
next_token = None

while len(sessions) < max_results:
params: Dict[str, Any] = {"memoryId": memory_id, "actorId": actor_id}
if next_token:
params["nextToken"] = next_token

response = self.gmdp_client.list_sessions(**params)
batch = response.get("sessionSummaries", [])
sessions.extend(batch)

next_token = response.get("nextToken")
if not next_token or len(sessions) >= max_results:
break

logger.debug("Found %d sessions", len(sessions))
return sessions[:max_results]

except ClientError as e:
logger.error("Failed to list sessions: %s", e)
raise

def list_events(
self,
memory_id: str,
Expand Down Expand Up @@ -1852,6 +1935,104 @@ def wait_for_memories(
logger.info("Note: Encountered %d service errors during polling", service_errors)
return False

def enable_observability(
self,
memory_id: str,
memory_arn: Optional[str] = None,
enable_logs: bool = True,
enable_traces: bool = True,
) -> Dict[str, Any]:
"""Enable CloudWatch observability for a memory resource.

This method sets up CloudWatch Logs delivery for memory APPLICATION_LOGS
and optionally X-Ray delivery for TRACES.

Args:
memory_id: The memory resource ID
memory_arn: Optional memory ARN (constructed from memory_id if not provided)
enable_logs: Whether to enable APPLICATION_LOGS delivery (default: True)
enable_traces: Whether to enable TRACES delivery to X-Ray (default: True)

Returns:
Dictionary with status and configuration details:
{
"status": "success" | "failed",
"log_group": str, # CloudWatch log group name
"error": str # Only present if status is "failed"
}

Example:
result = client.enable_observability(
memory_id="mem-123",
enable_logs=True,
enable_traces=True
)
if result["status"] == "success":
print(f"Logs available at: {result['log_group']}")

Note:
This method requires CloudWatch Logs permissions:
- logs:CreateLogGroup
- logs:PutDeliverySource
- logs:PutDeliveryDestination
- logs:CreateDelivery
"""
# TODO: Implement ObservabilityDeliveryManager
# Reference implementation:
# https://github.com/aws/bedrock-agentcore-starter-toolkit/blob/main/src/bedrock_agentcore_starter_toolkit/operations/observability/delivery.py
raise NotImplementedError(
"enable_observability() is not yet implemented. "
"See starter-toolkit for reference: "
"https://github.com/aws/bedrock-agentcore-starter-toolkit/blob/main/src/bedrock_agentcore_starter_toolkit/operations/observability/delivery.py"
)

def disable_observability(
self,
memory_id: str,
delete_log_group: bool = False,
) -> Dict[str, Any]:
"""Disable CloudWatch observability for a memory resource.

This method removes the CloudWatch Logs delivery infrastructure
for the specified memory.

Args:
memory_id: The memory resource ID
delete_log_group: Whether to also delete the CloudWatch log group (default: False)

Returns:
Dictionary with status and any errors:
{
"status": "success" | "partial",
"errors": List[str] # Only present if status is "partial"
}

Example:
# Disable delivery but keep logs
result = client.disable_observability(memory_id="mem-123")

# Disable delivery and delete log group
result = client.disable_observability(
memory_id="mem-123",
delete_log_group=True
)

Note:
This method requires CloudWatch Logs permissions:
- logs:DeleteDeliverySource
- logs:DeleteDeliveryDestination
- logs:DeleteDelivery
- logs:DeleteLogGroup (if delete_log_group=True)
"""
# TODO: Implement ObservabilityDeliveryManager
# Reference implementation:
# https://github.com/aws/bedrock-agentcore-starter-toolkit/blob/main/src/bedrock_agentcore_starter_toolkit/operations/observability/delivery.py
raise NotImplementedError(
"disable_observability() is not yet implemented. "
"See starter-toolkit for reference: "
"https://github.com/aws/bedrock-agentcore-starter-toolkit/blob/main/src/bedrock_agentcore_starter_toolkit/operations/observability/delivery.py"
)

def add_strategy(self, memory_id: str, strategy: Dict[str, Any]) -> Dict[str, Any]:
"""Add a strategy to a memory (without waiting).

Expand All @@ -1873,6 +2054,59 @@ def add_strategy(self, memory_id: str, strategy: Dict[str, Any]) -> Dict[str, An
)
return self._add_strategy(memory_id, strategy)

def add_strategy_and_wait(
self,
memory_id: str,
strategy: Dict[str, Any],
max_wait: int = 300,
poll_interval: int = 10,
) -> Dict[str, Any]:
"""Add a strategy to a memory and wait for it to return to ACTIVE state.

This is a generic method that accepts any strategy type as a dictionary.
For typed convenience methods, use add_semantic_strategy_and_wait(),
add_summary_strategy_and_wait(), etc.

Args:
memory_id: Memory resource ID
strategy: Strategy configuration dictionary (e.g., {"semanticMemoryStrategy": {...}})
max_wait: Maximum seconds to wait (default: 300)
poll_interval: Seconds between status checks (default: 10)

Returns:
Updated memory object in ACTIVE state

Example:
# Add a semantic strategy
memory = client.add_strategy_and_wait(
memory_id="mem-123",
strategy={
"semanticMemoryStrategy": {
"name": "my-strategy",
"description": "Extract key facts",
"namespaces": ["facts/{actorId}/{sessionId}/"]
}
}
)

# Add a custom strategy
memory = client.add_strategy_and_wait(
memory_id="mem-123",
strategy={
"customMemoryStrategy": {
"name": "custom-strategy",
"configuration": {...}
}
}
)
"""
return self.update_memory_strategies_and_wait(
memory_id=memory_id,
add_strategies=[strategy],
max_wait=max_wait,
poll_interval=poll_interval,
)

# Private methods

def _normalize_memory_response(self, memory: Dict[str, Any]) -> Dict[str, Any]:
Expand Down
52 changes: 52 additions & 0 deletions src/bedrock_agentcore/memory/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
"""Configuration models for Bedrock AgentCore Memory.

This module provides Pydantic models for Memory configuration
with YAML serialization support.
"""

from enum import Enum
from typing import Dict, List, Optional

from pydantic import BaseModel, ConfigDict, Field


class StrategyType(str, Enum):
"""Memory strategy types."""

SEMANTIC = "SEMANTIC"
SUMMARY = "SUMMARY"
USER_PREFERENCE = "USER_PREFERENCE"
CUSTOM_SEMANTIC = "CUSTOM_SEMANTIC"


class StrategyConfigModel(BaseModel):
"""Memory strategy configuration."""

model_config = ConfigDict(populate_by_name=True)

strategy_type: StrategyType = Field(alias="type")
namespace: str
custom_prompt: Optional[str] = Field(default=None, alias="customPrompt")


class MemoryConfigModel(BaseModel):
"""Complete memory configuration model.

This model represents the configuration for a Bedrock AgentCore memory,
suitable for YAML serialization and deserialization.

Attributes:
name: Unique memory name
description: Optional description
strategies: List of memory extraction strategies
encryption_key_arn: Optional KMS key ARN for encryption
tags: Resource tags
"""

model_config = ConfigDict(populate_by_name=True)

name: str
description: Optional[str] = None
strategies: Optional[List[StrategyConfigModel]] = None
encryption_key_arn: Optional[str] = Field(default=None, alias="encryptionKeyArn")
tags: Optional[Dict[str, str]] = None
Loading
Loading