LiveKit

Integrate Orq.ai with LiveKit Agents using OpenTelemetry

Getting Started

LiveKit is a platform for building real-time audio and video applications with AI agents. Tracing LiveKit Agents with Orq.ai provides comprehensive insights into real-time sessions, voice interactions, transcription performance, and AI response times to optimize your conversational AI applications.

Prerequisites

Before you begin, ensure you have:

  • An Orq.ai account and API key
  • Python 3.8+ installed
  • LiveKit Agents SDK installed
  • LiveKit server access (cloud or self-hosted)
  • API keys for speech and LLM providers

Install Dependencies

# Core LiveKit Agents
pip install livekit-agents livekit-api livekit-plugins-openai

# OpenTelemetry packages
pip install opentelemetry-sdk opentelemetry-exporter-otlp

# Speech and LLM providers (choose what you need)
pip install openai deepgram-sdk elevenlabs

# Additional utilities
pip install python-dotenv asyncio

Configure Orq.ai

Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:

Unix/Linux/macOS:

export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=livekit-agents,service.version=1.0.0"
export LIVEKIT_URL="<YOUR_LIVEKIT_URL>"
export LIVEKIT_API_KEY="<YOUR_LIVEKIT_API_KEY>"
export LIVEKIT_API_SECRET="<YOUR_LIVEKIT_API_SECRET>"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

Windows (PowerShell):

$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=livekit-agents,service.version=1.0.0"
$env:LIVEKIT_URL = "<YOUR_LIVEKIT_URL>"
$env:LIVEKIT_API_KEY = "<YOUR_LIVEKIT_API_KEY>"
$env:LIVEKIT_API_SECRET = "<YOUR_LIVEKIT_API_SECRET>"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"

Using .env file:

OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=livekit-agents,service.version=1.0.0
LIVEKIT_URL=<YOUR_LIVEKIT_URL>
LIVEKIT_API_KEY=<YOUR_LIVEKIT_API_KEY>
LIVEKIT_API_SECRET=<YOUR_LIVEKIT_API_SECRET>
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>

Integrations

LiveKit supports manual OpenTelemetry integration for comprehensive real-time observability:

Manual OpenTelemetry Setup

Create comprehensive tracing configuration:

# tracing.py
import os
from opentelemetry import trace, baggage
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes

# Configure resource
resource = Resource.create({
    ResourceAttributes.SERVICE_NAME: "livekit-agents",
    ResourceAttributes.SERVICE_VERSION: "1.0.0",
    ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv("ENVIRONMENT", "development"),
})

# Configure exporter
exporter = OTLPSpanExporter(
    endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + "/v1/traces",
    headers={"Authorization": os.getenv("OTEL_EXPORTER_OTLP_HEADERS").split("=")[1]}
)

# Set up tracing
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Get tracer
tracer = trace.get_tracer("livekit", "1.0.0")

Examples

Basic Voice Agent with Tracing

import asyncio
import time
from livekit.agents import AutoAgent, WorkerOptions, cli
from livekit.agents.llm import ChatMessage, ChatRole
from livekit.agents.voice_assistant import VoiceAssistant
from livekit.plugins import openai, deepgram
from tracing import tracer
from opentelemetry import trace, baggage

class TracedVoiceAgent:
    def __init__(self):
        self.llm = openai.LLM()
        self.stt = deepgram.STT()
        self.tts = openai.TTS()
        
    async def entrypoint(self, ctx: AutoAgent.Context):
        """Main agent entrypoint with comprehensive tracing"""
        
        # Set baggage for context propagation
        ctx_with_baggage = baggage.set_baggage("session.id", ctx.room.name or "unknown")
        
        with tracer.start_as_current_span(
            "livekit.session",
            context=ctx_with_baggage,
            attributes={
                "session.room_name": ctx.room.name or "unknown",
                "session.participant_count": len(ctx.room.participants),
                "agent.type": "voice_assistant",
            }
        ) as session_span:
            try:
                await ctx.connect(auto_subscribe=AutoAgent.ConnectionConfig.AutoSubscribe.AUDIO_ONLY)
                
                # Create voice assistant with tracing
                assistant = VoiceAssistant(
                    llm=self.llm,
                    stt=self.stt,
                    tts=self.tts,
                    chat_ctx=self._create_initial_context(),
                )
                
                # Set up event handlers with tracing
                assistant.on("user_speech_committed", self._on_user_speech)
                assistant.on("agent_speech_committed", self._on_agent_speech)
                
                session_span.set_attributes({
                    "assistant.llm.model": "gpt-4",
                    "assistant.stt.provider": "deepgram",
                    "assistant.tts.provider": "openai",
                })
                
                # Start the assistant
                assistant.start(ctx.room)
                await assistant.aclose()
                
                session_span.set_attribute("session.status", "completed")
                
            except Exception as e:
                session_span.record_exception(e)
                session_span.set_status(trace.StatusCode.ERROR, str(e))
                raise

    def _create_initial_context(self) -> list[ChatMessage]:
        """Create initial chat context"""
        return [
            ChatMessage(
                role=ChatRole.SYSTEM,
                content="You are a helpful voice assistant. Respond naturally and conversationally."
            )
        ]
    
    def _on_user_speech(self, message: str):
        """Handle user speech events with tracing"""
        with tracer.start_as_current_span(
            "livekit.user_speech",
            attributes={
                "speech.type": "user",
                "speech.text.length": len(message),
            }
        ) as span:
            span.set_attribute("speech.text", message[:200])
            print(f"User said: {message}")
    
    def _on_agent_speech(self, message: str):
        """Handle agent speech events with tracing"""
        with tracer.start_as_current_span(
            "livekit.agent_speech",
            attributes={
                "speech.type": "agent",
                "speech.text.length": len(message),
            }
        ):
            print(f"Agent said: {message}")

# Initialize and run the agent
if __name__ == "__main__":
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=TracedVoiceAgent().entrypoint,
        )
    )

Real-time Performance Monitoring

import asyncio
import time
from livekit.agents import AutoAgent
from livekit import rtc
from tracing import tracer
from opentelemetry import trace
from dataclasses import dataclass

@dataclass
class PerformanceMetrics:
    latency_ms: float
    audio_quality: float
    packet_loss: float

class PerformanceMonitor:
    def __init__(self):
        self.metrics_history = []
        self.session_start_time = time.time()
    
    async def collect_metrics(self, room: rtc.Room) -> PerformanceMetrics:
        """Collect comprehensive performance metrics"""
        with tracer.start_as_current_span(
            "livekit.performance.collect_metrics"
        ) as span:
            # Network metrics (simulated)
            latency_ms = await self._measure_network_latency()
            packet_loss = 0.1  # Mock packet loss
            audio_quality = 0.95  # Mock audio quality
            
            metrics = PerformanceMetrics(
                latency_ms=latency_ms,
                audio_quality=audio_quality,
                packet_loss=packet_loss
            )
            
            span.set_attributes({
                "performance.latency_ms": latency_ms,
                "performance.audio_quality": audio_quality,
                "performance.packet_loss": packet_loss,
            })
            
            self.metrics_history.append(metrics)
            return metrics
    
    async def _measure_network_latency(self) -> float:
        """Measure network latency"""
        start_time = time.time()
        await asyncio.sleep(0.01)  # Simulate measurement
        return (time.time() - start_time) * 1000

class MonitoredVoiceAgent:
    def __init__(self):
        self.performance_monitor = PerformanceMonitor()
        self.llm = openai.LLM()
        self.stt = openai.STT()
        self.tts = openai.TTS()
    
    async def entrypoint(self, ctx: AutoAgent.Context):
        """Voice agent with performance monitoring"""
        with tracer.start_as_current_span(
            "livekit.monitored_session",
            attributes={
                "session.monitoring": True,
                "session.performance_tracking": True,
            }
        ) as session_span:
            try:
                await ctx.connect()
                
                # Start performance monitoring
                monitor_task = asyncio.create_task(
                    self._performance_monitoring_loop(ctx.room, session_span)
                )
                
                assistant = VoiceAssistant(
                    llm=self.llm,
                    stt=self.stt,
                    tts=self.tts,
                )
                
                assistant.start(ctx.room)
                
                try:
                    await assistant.aclose()
                finally:
                    monitor_task.cancel()
                
                await self._generate_performance_summary(session_span)
                
            except Exception as e:
                session_span.record_exception(e)
                session_span.set_status(trace.StatusCode.ERROR, str(e))
                raise
    
    async def _performance_monitoring_loop(self, room: rtc.Room, session_span):
        """Continuous performance monitoring"""
        while True:
            try:
                metrics = await self.performance_monitor.collect_metrics(room)
                
                # Alert on performance issues
                if metrics.latency_ms > 200:
                    session_span.add_event("High latency detected", {
                        "latency_ms": metrics.latency_ms
                    })
                
                await asyncio.sleep(5)  # Monitor every 5 seconds
                
            except asyncio.CancelledError:
                break
    
    async def _generate_performance_summary(self, session_span):
        """Generate final performance summary"""
        if not self.performance_monitor.metrics_history:
            return
        
        metrics = self.performance_monitor.metrics_history
        avg_latency = sum(m.latency_ms for m in metrics) / len(metrics)
        
        session_span.set_attributes({
            "performance.summary.avg_latency_ms": avg_latency,
            "performance.summary.metrics_collected": len(metrics),
        })

# Run the monitored agent
if __name__ == "__main__":
    import livekit.agents.cli as cli
    cli.run_app(
        WorkerOptions(
            entrypoint_fnc=MonitoredVoiceAgent().entrypoint,
        )
    )

Next Steps

Verify traces: Check your Orq.ai dashboard to see incoming traces
Add custom attributes: Enhance traces with session-specific metadata
Set up alerts: Configure monitoring for voice quality degradation
Explore metrics: Use trace data for real-time performance optimization
Monitor user interactions: Track conversation flow and engagement

Related Documentation

Support