-
Notifications
You must be signed in to change notification settings - Fork 80
feat(memory): implement true message batching for flush_messages() #256
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(memory): implement true message batching for flush_messages() #256
Conversation
Add optional batch_size parameter to AgentCoreMemoryConfig that allows customers to buffer messages before sending to AgentCore Memory. Changes: - Add batch_size parameter (default=1, max=100) to AgentCoreMemoryConfig - Add message buffering with thread-safe _message_buffer and _buffer_lock - Modify create_message() to buffer when batch_size > 1 - Add flush_messages() to send all buffered messages - Add pending_message_count() to check buffer size - Add close() and context manager for cleanup - Add 24 unit tests covering batching functionality Default batch_size=1 preserves backward compatibility (immediate send).
Previously, batch_size buffered messages but still made N separate API calls for N messages. Now flush_messages() groups conversational messages by session_id and combines them into a single create_event() call per session, significantly reducing API calls. Key changes: - Group conversational messages by session_id into combined payloads - Preserve message order within each session's payload (earliest first) - Use the latest timestamp from grouped messages for the combined event - Send blob messages (>9KB) individually (different API path) - Clear buffer only after ALL API calls succeed to prevent data loss - Improve error messages to include session context Tests added for critical scenarios: - Multiple sessions grouped into separate API calls - Latest timestamp used for combined events - Partial failure with multiple sessions preserves entire buffer - Multiple blob messages sent individually (not batched) - Mixed sessions with blobs and conversational messages Also fixes pre-existing test issues: - Fix test_read_agent_legacy_migration mock setup to match actual impl - Fix test_load_long_term_memories_with_validation_failure for strands API
|
|
||
| # region Batching support | ||
|
|
||
| def flush_messages(self) -> list[dict[str, Any]]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be private method.
| """ | ||
| self.flush_messages() | ||
|
|
||
| # endregion Batching support |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this a comment?
| Returns: | ||
| int: Number of buffered messages waiting to be sent. | ||
| """ | ||
| with self._buffer_lock: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There must be a some atomic library that would incorporate the lock whenever you do operations in _message_buffer.
| return [] | ||
|
|
||
| results = [] | ||
| for session_id, messages, is_blob, monotonic_timestamp in messages_to_send: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not actually buffering right now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can take all the messages_to_send and put it under messages, if it's not a blob. https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/bedrock-agentcore/client/create_event.html. Make sure the messages are ordered in the earilers to latest when storing in one create_event.
Extract _create_session_manager helper to eliminate triple-nested context managers and move batching_config/batching_session_manager to module-level fixtures shared across all test classes.
…in __exit__ Wrap _flush_messages() in try/except so that if the with-block raises and flush also fails, the flush error is logged instead of replacing the original exception.
Iterate reversed(events) instead of reversing the flat message list after iteration, which scrambled intra-event payload order for batched events. Add 6 unit tests covering multi-event ordering, batched payload ordering, mixed blob/conversational ordering, and malformed payloads.
Summary
batch_sizeconfiguration option for message buffering (1-100, default 1)flush_messages()- groups conversational messages by session_id into single API callsevents_to_messages()ordering bug: iteratereversed(events)instead of reversing the flat message list after iteration, which scrambled intra-event payload order for batched eventsTest plan
pytest tests/bedrock_agentcore/memory/integrations/strands/test_agentcore_memory_session_manager.py -k "Batching" -vpytest tests/bedrock_agentcore/memory/integrations/strands/test_bedrock_converter.py -v