LiveKit
Integrate Orq.ai with LiveKit Agents using OpenTelemetry
Getting Started
LiveKit is a platform for building real-time audio and video applications with AI agents. Tracing LiveKit Agents with Orq.ai provides comprehensive insights into real-time sessions, voice interactions, transcription performance, and AI response times to optimize your conversational AI applications.
Prerequisites
Before you begin, ensure you have:
- An Orq.ai account and API key
- Python 3.8+ installed
- LiveKit Agents SDK installed
- LiveKit server access (cloud or self-hosted)
- API keys for speech and LLM providers
Install Dependencies
# Core LiveKit Agents
pip install livekit-agents livekit-api livekit-plugins-openai
# OpenTelemetry packages
pip install opentelemetry-sdk opentelemetry-exporter-otlp
# Speech and LLM providers (choose what you need)
pip install openai deepgram-sdk elevenlabs
# Additional utilities
pip install python-dotenv asyncio
Configure Orq.ai
Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:
Unix/Linux/macOS:
export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=livekit-agents,service.version=1.0.0"
export LIVEKIT_URL="<YOUR_LIVEKIT_URL>"
export LIVEKIT_API_KEY="<YOUR_LIVEKIT_API_KEY>"
export LIVEKIT_API_SECRET="<YOUR_LIVEKIT_API_SECRET>"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
Windows (PowerShell):
$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=livekit-agents,service.version=1.0.0"
$env:LIVEKIT_URL = "<YOUR_LIVEKIT_URL>"
$env:LIVEKIT_API_KEY = "<YOUR_LIVEKIT_API_KEY>"
$env:LIVEKIT_API_SECRET = "<YOUR_LIVEKIT_API_SECRET>"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"
Using .env file:
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=livekit-agents,service.version=1.0.0
LIVEKIT_URL=<YOUR_LIVEKIT_URL>
LIVEKIT_API_KEY=<YOUR_LIVEKIT_API_KEY>
LIVEKIT_API_SECRET=<YOUR_LIVEKIT_API_SECRET>
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>
Integrations
LiveKit supports manual OpenTelemetry integration for comprehensive real-time observability:
Manual OpenTelemetry Setup
Create comprehensive tracing configuration:
# tracing.py
import os
from opentelemetry import trace, baggage
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
# Configure resource
resource = Resource.create({
ResourceAttributes.SERVICE_NAME: "livekit-agents",
ResourceAttributes.SERVICE_VERSION: "1.0.0",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv("ENVIRONMENT", "development"),
})
# Configure exporter
exporter = OTLPSpanExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + "/v1/traces",
headers={"Authorization": os.getenv("OTEL_EXPORTER_OTLP_HEADERS").split("=")[1]}
)
# Set up tracing
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Get tracer
tracer = trace.get_tracer("livekit", "1.0.0")
Examples
Basic Voice Agent with Tracing
import asyncio
import time
from livekit.agents import AutoAgent, WorkerOptions, cli
from livekit.agents.llm import ChatMessage, ChatRole
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import openai, deepgram
from tracing import tracer
from opentelemetry import trace, baggage
class TracedVoiceAgent:
def __init__(self):
self.llm = openai.LLM()
self.stt = deepgram.STT()
self.tts = openai.TTS()
async def entrypoint(self, ctx: AutoAgent.Context):
"""Main agent entrypoint with comprehensive tracing"""
# Set baggage for context propagation
ctx_with_baggage = baggage.set_baggage("session.id", ctx.room.name or "unknown")
with tracer.start_as_current_span(
"livekit.session",
context=ctx_with_baggage,
attributes={
"session.room_name": ctx.room.name or "unknown",
"session.participant_count": len(ctx.room.participants),
"agent.type": "voice_assistant",
}
) as session_span:
try:
await ctx.connect(auto_subscribe=AutoAgent.ConnectionConfig.AutoSubscribe.AUDIO_ONLY)
# Create voice assistant with tracing
assistant = VoiceAssistant(
llm=self.llm,
stt=self.stt,
tts=self.tts,
chat_ctx=self._create_initial_context(),
)
# Set up event handlers with tracing
assistant.on("user_speech_committed", self._on_user_speech)
assistant.on("agent_speech_committed", self._on_agent_speech)
session_span.set_attributes({
"assistant.llm.model": "gpt-4",
"assistant.stt.provider": "deepgram",
"assistant.tts.provider": "openai",
})
# Start the assistant
assistant.start(ctx.room)
await assistant.aclose()
session_span.set_attribute("session.status", "completed")
except Exception as e:
session_span.record_exception(e)
session_span.set_status(trace.StatusCode.ERROR, str(e))
raise
def _create_initial_context(self) -> list[ChatMessage]:
"""Create initial chat context"""
return [
ChatMessage(
role=ChatRole.SYSTEM,
content="You are a helpful voice assistant. Respond naturally and conversationally."
)
]
def _on_user_speech(self, message: str):
"""Handle user speech events with tracing"""
with tracer.start_as_current_span(
"livekit.user_speech",
attributes={
"speech.type": "user",
"speech.text.length": len(message),
}
) as span:
span.set_attribute("speech.text", message[:200])
print(f"User said: {message}")
def _on_agent_speech(self, message: str):
"""Handle agent speech events with tracing"""
with tracer.start_as_current_span(
"livekit.agent_speech",
attributes={
"speech.type": "agent",
"speech.text.length": len(message),
}
):
print(f"Agent said: {message}")
# Initialize and run the agent
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint_fnc=TracedVoiceAgent().entrypoint,
)
)
Real-time Performance Monitoring
import asyncio
import time
from livekit.agents import AutoAgent
from livekit import rtc
from tracing import tracer
from opentelemetry import trace
from dataclasses import dataclass
@dataclass
class PerformanceMetrics:
latency_ms: float
audio_quality: float
packet_loss: float
class PerformanceMonitor:
def __init__(self):
self.metrics_history = []
self.session_start_time = time.time()
async def collect_metrics(self, room: rtc.Room) -> PerformanceMetrics:
"""Collect comprehensive performance metrics"""
with tracer.start_as_current_span(
"livekit.performance.collect_metrics"
) as span:
# Network metrics (simulated)
latency_ms = await self._measure_network_latency()
packet_loss = 0.1 # Mock packet loss
audio_quality = 0.95 # Mock audio quality
metrics = PerformanceMetrics(
latency_ms=latency_ms,
audio_quality=audio_quality,
packet_loss=packet_loss
)
span.set_attributes({
"performance.latency_ms": latency_ms,
"performance.audio_quality": audio_quality,
"performance.packet_loss": packet_loss,
})
self.metrics_history.append(metrics)
return metrics
async def _measure_network_latency(self) -> float:
"""Measure network latency"""
start_time = time.time()
await asyncio.sleep(0.01) # Simulate measurement
return (time.time() - start_time) * 1000
class MonitoredVoiceAgent:
def __init__(self):
self.performance_monitor = PerformanceMonitor()
self.llm = openai.LLM()
self.stt = openai.STT()
self.tts = openai.TTS()
async def entrypoint(self, ctx: AutoAgent.Context):
"""Voice agent with performance monitoring"""
with tracer.start_as_current_span(
"livekit.monitored_session",
attributes={
"session.monitoring": True,
"session.performance_tracking": True,
}
) as session_span:
try:
await ctx.connect()
# Start performance monitoring
monitor_task = asyncio.create_task(
self._performance_monitoring_loop(ctx.room, session_span)
)
assistant = VoiceAssistant(
llm=self.llm,
stt=self.stt,
tts=self.tts,
)
assistant.start(ctx.room)
try:
await assistant.aclose()
finally:
monitor_task.cancel()
await self._generate_performance_summary(session_span)
except Exception as e:
session_span.record_exception(e)
session_span.set_status(trace.StatusCode.ERROR, str(e))
raise
async def _performance_monitoring_loop(self, room: rtc.Room, session_span):
"""Continuous performance monitoring"""
while True:
try:
metrics = await self.performance_monitor.collect_metrics(room)
# Alert on performance issues
if metrics.latency_ms > 200:
session_span.add_event("High latency detected", {
"latency_ms": metrics.latency_ms
})
await asyncio.sleep(5) # Monitor every 5 seconds
except asyncio.CancelledError:
break
async def _generate_performance_summary(self, session_span):
"""Generate final performance summary"""
if not self.performance_monitor.metrics_history:
return
metrics = self.performance_monitor.metrics_history
avg_latency = sum(m.latency_ms for m in metrics) / len(metrics)
session_span.set_attributes({
"performance.summary.avg_latency_ms": avg_latency,
"performance.summary.metrics_collected": len(metrics),
})
# Run the monitored agent
if __name__ == "__main__":
import livekit.agents.cli as cli
cli.run_app(
WorkerOptions(
entrypoint_fnc=MonitoredVoiceAgent().entrypoint,
)
)
Next Steps
✅ Verify traces: Check your Orq.ai dashboard to see incoming traces
✅ Add custom attributes: Enhance traces with session-specific metadata
✅ Set up alerts: Configure monitoring for voice quality degradation
✅ Explore metrics: Use trace data for real-time performance optimization
✅ Monitor user interactions: Track conversation flow and engagement
Related Documentation
Support
Updated about 24 hours ago