-
Notifications
You must be signed in to change notification settings - Fork 128
Expand file tree
/
Copy pathsip_lifecycle.py
More file actions
245 lines (198 loc) · 9.77 KB
/
sip_lifecycle.py
File metadata and controls
245 lines (198 loc) · 9.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
"""
---
title: SIP Lifecycle Management Agent
category: telephony
tags: [telephony, sip, deepgram, openai]
difficulty: advanced
description: Advanced SIP agent demonstrating complete call lifecycle management
demonstrates:
- SIP participant management and addition
- Call status tracking and event handling
- Room management and deletion
- Participant attribute monitoring
- Call lifecycle event handlers
- Function tools for call operations
---
"""
import asyncio
import logging
import os
import uuid
from dotenv import load_dotenv
from livekit.agents import AgentServer, AgentSession, JobContext, JobProcess, cli, Agent, inference, RunContext, function_tool
from livekit import rtc
from livekit import api
from livekit.plugins import silero
load_dotenv()
logger = logging.getLogger("sip-lifecycle-agent")
logger.setLevel(logging.INFO)
class SIPLifecycleAgent(Agent):
def __init__(self, job_context=None) -> None:
self.job_context = job_context
super().__init__(
instructions="""
You are a helpful assistant demonstrating SIP call lifecycle management.
You can add SIP participants and end the call when requested.
""",
)
@function_tool
async def add_sip_participant(self, context: RunContext, phone_number: str):
"""
Add a SIP participant to the current call.
Args:
context: The call context
phone_number: The phone number to call
"""
if not self.job_context:
logger.error("No job context available")
await self.session.say("I'm sorry, I can't add participants at this time.")
return None, "Failed to add SIP participant: No job context available"
room_name = self.job_context.room.name
identity = f"sip_{uuid.uuid4().hex[:8]}"
sip_trunk_id = os.environ.get('SIP_TRUNK_ID')
logger.info(f"Adding SIP participant with phone number {phone_number} to room {room_name}")
try:
response = await self.job_context.api.sip.create_sip_participant(
api.CreateSIPParticipantRequest(
sip_trunk_id=sip_trunk_id,
sip_call_to=phone_number,
room_name=room_name,
participant_identity=identity,
participant_name=f"SIP Participant {phone_number}",
krisp_enabled=True
)
)
logger.info(f"Successfully added SIP participant: {response}")
return None, f"Added SIP participant {phone_number} to the call."
except Exception as e:
logger.error(f"Error adding SIP participant: {e}")
await self.session.say(f"I'm sorry, I couldn't add {phone_number} to the call.")
return None, f"Failed to add SIP participant: {e}"
@function_tool
async def end_call(self, context: RunContext):
"""
End the current call by deleting the room.
"""
if not self.job_context:
logger.error("No job context available")
await self.session.say("I'm sorry, I can't end the call at this time.")
return None, "Failed to end call: No job context available"
room_name = self.job_context.room.name
logger.info(f"Ending call by deleting room {room_name}")
try:
await context.session.generate_reply(
instructions="Thank you for your time. I'll be ending this call now. Goodbye!"
)
await self.job_context.api.room.delete_room(
api.DeleteRoomRequest(room=room_name)
)
logger.info(f"Successfully deleted room {room_name}")
return None, "Call ended successfully."
except Exception as e:
logger.error(f"Error ending call: {e}")
return None, f"Failed to end call: {e}"
@function_tool
async def log_participants(self, context: RunContext):
"""
Log all participants in the current room.
"""
if not self.job_context:
logger.error("No job context available")
await self.session.say("I'm sorry, I can't list participants at this time.")
return None, "Failed to list participants: No job context available"
room_name = self.job_context.room.name
logger.info(f"Logging participants in room {room_name}")
try:
response = await self.job_context.api.room.list_participants(
api.ListParticipantsRequest(room=room_name)
)
participants = response.participants
participant_info = []
for p in participants:
participant_info.append({
"identity": p.identity,
"name": p.name,
"state": p.state,
"is_publisher": p.is_publisher
})
logger.info(f"Participants in room {room_name}: {participant_info}")
await self.session.say(f"There are {len(participants)} participants in this call.")
return None, f"Listed {len(participants)} participants in the room."
except Exception as e:
logger.error(f"Error listing participants: {e}")
return None, f"Failed to list participants: {e}"
async def on_enter(self):
self.session.generate_reply()
server = AgentServer()
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session()
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {"room": ctx.room.name}
session = AgentSession(
stt=inference.STT(model="deepgram/nova-3", language="en"),
llm=inference.LLM(model="openai/gpt-4.1-mini"),
tts=inference.TTS(model="elevenlabs/eleven_multilingual_v2"),
vad=ctx.proc.userdata["vad"],
preemptive_generation=True,
)
agent = SIPLifecycleAgent(job_context=ctx)
await session.start(agent=agent, room=ctx.room)
await ctx.connect()
def on_participant_connected_handler(participant: rtc.RemoteParticipant):
asyncio.create_task(async_on_participant_connected(participant))
def on_participant_attributes_changed_handler(changed_attributes: dict, participant: rtc.Participant):
asyncio.create_task(async_on_participant_attributes_changed(changed_attributes, participant))
async def async_on_participant_connected(participant: rtc.RemoteParticipant):
logger.info(f"New participant connected: {participant.identity}")
# Check if this is a SIP participant and log call status
if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:
logger.info(f"SIP participant connected: {participant.identity}")
# Log SIP attributes
if participant.attributes:
call_id = participant.attributes.get('sip.callID', 'Unknown')
call_status = participant.attributes.get('sip.callStatus', 'Unknown')
phone_number = participant.attributes.get('sip.phoneNumber', 'Unknown')
trunk_id = participant.attributes.get('sip.trunkID', 'Unknown')
trunk_phone = participant.attributes.get('sip.trunkPhoneNumber', 'Unknown')
logger.info(f"SIP Call ID: {call_id}")
logger.info(f"SIP Call Status: {call_status}")
logger.info(f"SIP Phone Number: {phone_number}")
logger.info(f"SIP Trunk ID: {trunk_id}")
logger.info(f"SIP Trunk Phone Number: {trunk_phone}")
# Log specific call status information
if call_status == 'active':
logger.info("Call is active and connected")
elif call_status == 'automation':
logger.info("Call is connected and dialing DTMF numbers")
elif call_status == 'dialing':
logger.info("Call is dialing and waiting to be picked up")
elif call_status == 'hangup':
logger.info("Call has been ended by a participant")
elif call_status == 'ringing':
logger.info("Inbound call is ringing for the caller")
await agent.session.say(f"Welcome, {participant.name or participant.identity}! I can help you add a participant to this call or end the call.")
async def async_on_participant_attributes_changed(changed_attributes: dict, participant: rtc.Participant):
logger.info(f"Participant {participant.identity} attributes changed: {changed_attributes}")
# Check if this is a SIP participant and if call status has changed
if participant.kind == rtc.ParticipantKind.PARTICIPANT_KIND_SIP:
# Check if sip.callStatus is in the changed attributes
if 'sip.callStatus' in changed_attributes:
call_status = changed_attributes['sip.callStatus']
logger.info(f"SIP Call Status updated: {call_status}")
# Log specific call status information
if call_status == 'active':
logger.info("Call is now active and connected")
elif call_status == 'automation':
logger.info("Call is now connected and dialing DTMF numbers")
elif call_status == 'dialing':
logger.info("Call is now dialing and waiting to be picked up")
elif call_status == 'hangup':
logger.info("Call has been ended by a participant")
elif call_status == 'ringing':
logger.info("Inbound call is now ringing for the caller")
ctx.room.on("participant_connected", on_participant_connected_handler)
ctx.room.on("participant_attributes_changed", on_participant_attributes_changed_handler)
if __name__ == "__main__":
cli.run_app(server)