The CapabilityWorker is the core SDK class for all I/O inside an Ability. Access it via self.capability_worker after initializing in call().
Converts text to speech using the Agent's default voice.
await self.capability_worker.speak("Hello! How can I help?")Converts text to speech using a specific Voice ID. Use when your Ability needs its own voice.
await self.capability_worker.text_to_speech("Welcome!", "pNInz6obpgDQGcFmaJgB")See the Voice ID catalog for available voices.
Waits for the user's next input. Returns a string.
user_input = await self.capability_worker.user_response()Waits until the user has completely finished speaking. Use when you need the full utterance without premature cutoff.
full_input = await self.capability_worker.wait_for_complete_transcription()Speaks the text, then waits for a response. Returns the user's reply.
answer = await self.capability_worker.run_io_loop("What's your name?")Asks a yes/no question. Loops until the user confirms. Returns True or False.
confirmed = await self.capability_worker.run_confirmation_loop("Should I continue?")Generates a text response using the configured LLM. This is synchronous (no await).
response = self.capability_worker.text_to_text_response(
"Explain quantum computing in one sentence."
)With conversation history:
history = [
{"role": "user", "content": "Tell me about dogs"},
{"role": "assistant", "content": "Dogs are loyal companions..."},
]
response = self.capability_worker.text_to_text_response(
"What breeds are best for apartments?",
history=history,
)With a system prompt:
response = self.capability_worker.text_to_text_response(
"The user asked about cooking pasta",
system_prompt="You are a professional Italian chef. Keep responses under 2 sentences.",
)Plays audio from bytes or a file-like object.
import requests
resp = requests.get("https://example.com/sound.mp3")
await self.capability_worker.play_audio(resp.content)Plays an audio file from the Ability's folder.
await self.capability_worker.play_from_audio_file("alert.mp3")For longer audio or real-time streaming:
await self.capability_worker.stream_init()
await self.capability_worker.send_audio_data_in_stream(audio_bytes, chunk_size=4096)
await self.capability_worker.stream_end()Sends structured data over WebSocket. Used for music mode, DevKit actions, and custom events.
await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"})Sends a hardware action to the DevKit.
await self.capability_worker.send_devkit_action("led-on")You MUST call this when your Ability is done. Returns control to the Agent.
self.capability_worker.resume_normal_flow()If you forget this, the Agent will be stuck and unresponsive.
Stops current assistant output and returns control to user input. Call this before speak() or play_audio() from a background daemon to avoid audio overlap.
await self.capability_worker.send_interrupt_signal()Returns the current user's timezone string.
timezone = self.capability_worker.get_timezone()Synchronous. Returns a string like America/Chicago or None if unavailable.
Returns the linked account access token for the current user.
token = self.capability_worker.get_token("google")
self.worker.editor_logging_handler.info(token)Synchronous. linked_platform must be one of: Google ("google"), Slack ("slack"), Discord ("discord").
Returns the full conversation history from the current session.
history = self.capability_worker.get_full_message_history()Use this to read what happened before your Ability was triggered — gives context for smarter responses.
A built-in key-value store for persisting structured user data across sessions. All methods are synchronous (no await). Each key stores a dict as its value. Storage is scoped at the user level — any ability can read and write any key for a given user.
Creates a new key-value pair. Errors if the key already exists.
self.capability_worker.create_key(
key="user_preferences",
value={"language": "en", "theme": "dark", "notifications": True}
)Replaces the value at an existing key with a new dict. Errors if the key doesn't exist.
self.capability_worker.update_key(
key="user_preferences",
value={"language": "en", "theme": "light", "notifications": False}
)Permanently removes a stored key-value pair.
self.capability_worker.delete_key("user_preferences")Returns all stored key-value pairs for the current user as a dict.
all_context = self.capability_worker.get_all_keys()
# Returns: {"user_preferences": {"theme": "light"}, "last_session": {...}}Returns the dict stored at a specific key, or None if the key doesn't exist.
preferences = self.capability_worker.get_single_key("user_preferences")
# Returns: {"language": "en", "theme": "light"} or Nonecreate_key errors if the key exists; update_key errors if it doesn't. Always check first:
existing = self.capability_worker.get_single_key("user_preferences")
if existing:
self.capability_worker.update_key("user_preferences", new_value)
else:
self.capability_worker.create_key("user_preferences", new_value)# Save state when workflow starts
self.capability_worker.create_key(
key="booking_flow_1234",
value={"destination": "Dubai", "step": "awaiting_date"}
)
# Advance state
self.capability_worker.update_key(
key="booking_flow_1234",
value={"destination": "Dubai", "step": "confirmed"}
)
# Resume from state
context = self.capability_worker.get_single_key("booking_flow_1234")
# Clean up
self.capability_worker.delete_key("booking_flow_1234")Access via self.worker:
self.worker.editor_logging_handler.info("Something happened")
self.worker.editor_logging_handler.error("Something broke")
self.worker.editor_logging_handler.warning("Something looks off")Never use print(). Always use the logging handler.
self.worker.session_tasks.create(some_coroutine()) # Instead of asyncio.create_task()
await self.worker.session_tasks.sleep(2.0) # Instead of asyncio.sleep()self.worker.music_mode_event.set() # Enter music mode
self.worker.music_mode_event.clear() # Exit music mode