-
Notifications
You must be signed in to change notification settings - Fork 82
feat(memory): event metadata state identification, message batching, and redundant sync elimination #244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Replace actorId prefix-based approach with event metadata for distinguishing session and agent state events. Add auto-migration for legacy events. Changes: - Add StateType enum (SESSION, AGENT) and metadata keys - Update create_session/create_agent to include stateType metadata - Update read_session/read_agent to filter by metadata - Add backwards-compatible auto-migration: legacy events are converted to new format on read (create new with metadata, delete old) - Add tests for legacy migration behavior
… filters - Fix pagination bug in list_events where API returns nextToken even with 0 results, causing "metadata filter mismatch" error on subsequent page - Add _retry_with_backoff method for handling eventual consistency when reading newly created agents via metadata filter - Track created agent IDs to handle updates during consistency window - Update test to account for retry behavior in legacy migration
| }, | ||
| ) | ||
| # Track created agent for eventual consistency handling | ||
| self._created_agent_ids.add(session_agent.agent_id) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to say that we support multiple agents per session ? Does Strands support that for session managers ? And also, this means we're using metadata to store and retrieve events from one agent_ID and not branches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_created_agent_ids is a fallback for when list_events_by_filter fails to find an agent due to eventual consistency. Newly created events aren't immediately visible when filtering by metadata - there's a short propagation delay. If the metadata query returns nothing but the agent_id is in _created_agent_ids, we know the agent was created and proceed accordingly rather than failing in update_agent
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Strands supports multiple agents per session. However, we strongly advise against this to the customer by a log message in 582.
| ] | ||
|
|
||
| # Use retry with backoff to handle eventual consistency | ||
| events = self._retry_with_backoff( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we do retries for functions like read_agent ? It's causing unnecessary costs, maybe it's just a new agent that's empty.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Retries are needed because newly created events aren't immediately visible when filtering by event_metadata. Without retries, we may miss agent state updates.
) * feat(memory): add batch_size configuration for message buffering Add optional batch_size parameter to AgentCoreMemoryConfig that allows customers to buffer messages before sending to AgentCore Memory. Changes: - Add batch_size parameter (default=1, max=100) to AgentCoreMemoryConfig - Add message buffering with thread-safe _message_buffer and _buffer_lock - Modify create_message() to buffer when batch_size > 1 - Add flush_messages() to send all buffered messages - Add pending_message_count() to check buffer size - Add close() and context manager for cleanup - Add 24 unit tests covering batching functionality Default batch_size=1 preserves backward compatibility (immediate send). * feat(memory): implement true message batching in flush_messages() Previously, batch_size buffered messages but still made N separate API calls for N messages. Now flush_messages() groups conversational messages by session_id and combines them into a single create_event() call per session, significantly reducing API calls. Key changes: - Group conversational messages by session_id into combined payloads - Preserve message order within each session's payload (earliest first) - Use the latest timestamp from grouped messages for the combined event - Send blob messages (>9KB) individually (different API path) - Clear buffer only after ALL API calls succeed to prevent data loss - Improve error messages to include session context Tests added for critical scenarios: - Multiple sessions grouped into separate API calls - Latest timestamp used for combined events - Partial failure with multiple sessions preserves entire buffer - Multiple blob messages sent individually (not batched) - Mixed sessions with blobs and conversational messages Also fixes pre-existing test issues: - Fix test_read_agent_legacy_migration mock setup to match actual impl - Fix test_load_long_term_memories_with_validation_failure for strands API * refactor: rename flush_messages to _flush_messages (private method) * refactor(tests): deduplicate batching test fixtures Extract _create_session_manager helper to eliminate triple-nested context managers and move batching_config/batching_session_manager to module-level fixtures shared across all test classes. * fix(memory): prevent flush exception from masking original exception in __exit__ Wrap _flush_messages() in try/except so that if the with-block raises and flush also fails, the flush error is logged instead of replacing the original exception. * fix(memory): fix events_to_messages ordering for batched events Iterate reversed(events) instead of reversing the flat message list after iteration, which scrambled intra-event payload order for batched events. Add 6 unit tests covering multi-event ordering, batched payload ordering, mixed blob/conversational ordering, and malformed payloads.
Document the new batch_size parameter, message batching usage patterns, and metadata-related data models introduced in PR #244.
Summary
batch_size, and fixesevents_to_messages()ordering for batched eventssync_agentAPI calls via agent state hash tracking, eliminating ~6+ unnecessary API calls per conversation turn (code credit: @kevmyung)Fixes #220
Fixes #196
Changes
Event Metadata for State Identification
Session and agent state events are now identified using event metadata instead of encoding state type into the actorId, enabling cleaner actor identification.
Before:
After:
StateTypeenum (SESSION, AGENT) and metadata constantsEventMetadataFilterqueries instead of prefixed actorIdsBackwards Compatibility
Existing sessions created with the old prefixed actorId format continue to work without any code changes.
Message Batching
Multiple messages can now be grouped into single API calls, reducing the number of requests.
batch_sizeconfiguration option (1-100, default 1)events_to_messages()ordering for batched events by iterating in reverse instead of reversing the flat message listRedundant Sync Elimination
The strands framework's
register_hooksfiressync_agenton bothMessageAddedEventandAfterInvocationEvent, causing 2 API calls (read_agent+create_agent) per sync — ~6+ redundant calls per turn when agent state hasn't changed.created_at/updated_attimestamps) after each syncsync_agentcall; skip entirely when unchangedinitialize()so the first hook-triggered sync correctly detects unchanged stateTest plan
pytest tests/bedrock_agentcore/memory/integrations/strands/ -v)pytest tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py -k "Batching" -vpytest tests/bedrock_agentcore/memory/integrations/strands/test_bedrock_converter.py -v(24 tests)Future Work