CrewAI

Integrate Orq.ai with CrewAI using OpenTelemetry

Getting Started

CrewAI enables powerful multi-agent coordination for complex AI workflows. Tracing CrewAI with Orq.ai provides comprehensive insights into agent interactions, task execution, tool usage, and crew performance to optimize your multi-agent systems.

Prerequisites

Before you begin, ensure you have:

  • An Orq.ai account and API key
  • CrewAI installed in your project
  • Python 3.8+
  • OpenAI API key (or other LLM provider credentials)

Install Dependencies

# Core CrewAI and OpenTelemetry packages
pip install crewai opentelemetry-sdk opentelemetry-exporter-otlp

# Additional instrumentation packages
pip install openlit traceloop-sdk

# LLM providers
pip install openai anthropic

# Optional: Advanced tools and integrations
pip install crewai-tools langchain-community

Configure Orq.ai

Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:

Unix/Linux/macOS:

export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=crewai-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

Windows (PowerShell):

$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=crewai-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"

Using .env file:

OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=crewai-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>

Integrations

Choose your preferred OpenTelemetry framework for collecting traces:

OpenLit

Auto-instrumentation with minimal setup:

import openlit
from crewai import Agent, Task, Crew

# Initialize OpenLit
openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel", 
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

# Your CrewAI code is automatically traced
researcher = Agent(
    role='Market Research Analyst',
    goal='Gather comprehensive market data and trends',
    backstory='Expert in analyzing market dynamics and consumer behavior'
)

task = Task(
    description='Research the latest trends in AI and machine learning',
    agent=researcher
)

crew = Crew(agents=[researcher], tasks=[task])
result = crew.kickoff()

OpenLLMetry

Non-intrusive tracing with decorators:

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow
from crewai import Agent, Task, Crew

Traceloop.init()

@workflow(name="crewai-research-workflow")
def create_research_crew():
    # Define agents
    researcher = Agent(
        role='Research Analyst',
        goal='Conduct thorough research on given topics',
        backstory='Experienced researcher with strong analytical skills'
    )
    
    writer = Agent(
        role='Content Writer',
        goal='Create engaging content based on research findings',
        backstory='Skilled writer with expertise in technical communication'
    )
    
    # Define tasks
    research_task = Task(
        description='Research the latest developments in quantum computing',
        agent=researcher
    )
    
    writing_task = Task(
        description='Write a comprehensive article based on the research findings',
        agent=writer
    )
    
    # Create and run crew
    crew = Crew(
        agents=[researcher, writer],
        tasks=[research_task, writing_task],
        verbose=True
    )
    
    return crew.kickoff()

result = create_research_crew()

MLFlow

MLOps-focused tracing:

import mlflow
from crewai import Agent, Task, Crew

# Enable MLflow tracing
mlflow.autolog()

@mlflow.trace
def run_crewai_pipeline(research_topic: str):
    # Create agents
    data_analyst = Agent(
        role='Data Analyst',
        goal='Analyze data patterns and extract insights',
        backstory='Expert in statistical analysis and data interpretation'
    )
    
    strategist = Agent(
        role='Strategy Consultant',
        goal='Develop actionable strategies based on data insights',
        backstory='Experienced consultant with strategic thinking abilities'
    )
    
    # Create tasks
    analysis_task = Task(
        description=f'Analyze market data related to {research_topic}',
        agent=data_analyst
    )
    
    strategy_task = Task(
        description='Develop strategic recommendations based on the analysis',
        agent=strategist
    )
    
    # Execute crew
    crew = Crew(
        agents=[data_analyst, strategist],
        tasks=[analysis_task, strategy_task]
    )
    
    return crew.kickoff()

result = run_crewai_pipeline("artificial intelligence market trends")

OpenLLMetry

Comprehensive CrewAI instrumentation:

from traceloop.sdk import Traceloop
from crewai import Agent, Task, Crew
from crewai.tools import tool

# Initialize tracing
Traceloop.init(
    app_name="crewai-app",
    disable_batch=True
)

@tool("web_search")
def web_search(query: str) -> str:
    """Mock web search tool"""
    return f"Search results for: {query}"

def create_content_crew():
    # Define specialized agents
    researcher = Agent(
        role='Senior Research Analyst',
        goal='Conduct comprehensive research and fact-checking',
        backstory='10+ years experience in market research and data analysis',
        tools=[web_search],
        verbose=True
    )
    
    editor = Agent(
        role='Content Editor',
        goal='Review and refine content for quality and accuracy',
        backstory='Expert editor with strong attention to detail',
        verbose=True
    )
    
    # Define sequential tasks
    research_task = Task(
        description='Research current trends in sustainable technology',
        agent=researcher,
        expected_output='Detailed research report with key findings'
    )
    
    editing_task = Task(
        description='Edit and refine the research report',
        agent=editor,
        expected_output='Polished final report',
        context=[research_task]
    )
    
    # Create crew with process flow
    crew = Crew(
        agents=[researcher, editor],
        tasks=[research_task, editing_task],
        verbose=True,
        memory=True
    )
    
    return crew.kickoff()

result = create_content_crew()

Examples

Basic Multi-Agent Crew

import openlit
from crewai import Agent, Task, Crew
from crewai.process import Process

# Initialize tracing
openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

def basic_crew_example():
    # Define agents with specific roles
    market_researcher = Agent(
        role='Market Researcher',
        goal='Identify market opportunities and trends',
        backstory='Experienced market analyst with deep industry knowledge',
        verbose=True,
        allow_delegation=False
    )
    
    business_analyst = Agent(
        role='Business Analyst',
        goal='Analyze business implications and create recommendations',
        backstory='Strategic thinker with strong analytical skills',
        verbose=True,
        allow_delegation=False
    )
    
    # Define tasks with clear objectives
    market_analysis_task = Task(
        description="""
        Analyze the current state of the AI software development tools market.
        Include key players, market size, growth trends, and emerging opportunities.
        """,
        agent=market_researcher,
        expected_output="Comprehensive market analysis report"
    )
    
    business_strategy_task = Task(
        description="""
        Based on the market analysis, develop strategic recommendations 
        for entering the AI tools market.
        """,
        agent=business_analyst,
        expected_output="Strategic business recommendations document"
    )
    
    # Create and execute crew
    crew = Crew(
        agents=[market_researcher, business_analyst],
        tasks=[market_analysis_task, business_strategy_task],
        process=Process.sequential,
        verbose=True
    )
    
    result = crew.kickoff()
    return result

result = basic_crew_example()
print("Crew execution completed:", result)

Advanced Crew with Custom Tools

import openlit
from crewai import Agent, Task, Crew
from crewai.tools import tool
from typing import Dict, Any

openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

@tool("data_analyzer")
def analyze_data(data_description: str) -> str:
    """Analyze data patterns and extract insights"""
    # Mock data analysis
    insights = {
        "trend": "upward",
        "confidence": "high",
        "key_metrics": ["growth_rate", "user_adoption", "market_share"]
    }
    return f"Data analysis for {data_description}: {insights}"

@tool("report_generator")
def generate_report(content: str) -> str:
    """Generate a structured report from content"""
    return f"EXECUTIVE SUMMARY\n\nAnalysis: {content}\n\nRecommendations: Based on the analysis..."

def advanced_crew_with_tools():
    # Data specialist with analysis tools
    data_scientist = Agent(
        role='Senior Data Scientist',
        goal='Extract meaningful insights from complex datasets',
        backstory='PhD in Statistics with 8+ years in data science',
        tools=[analyze_data],
        verbose=True
    )
    
    # Report specialist with documentation tools
    technical_writer = Agent(
        role='Technical Documentation Specialist',
        goal='Create clear, comprehensive technical documentation',
        backstory='Expert in technical communication and document design',
        tools=[generate_report],
        verbose=True
    )
    
    # Quality assurance specialist
    qa_reviewer = Agent(
        role='Quality Assurance Reviewer',
        goal='Ensure accuracy and completeness of all deliverables',
        backstory='Meticulous reviewer with strong attention to detail',
        verbose=True
    )
    
    # Sequential tasks with dependencies
    data_analysis_task = Task(
        description='Analyze customer behavior patterns from the last quarter',
        agent=data_scientist,
        expected_output='Statistical analysis with key behavioral insights'
    )
    
    documentation_task = Task(
        description='Create comprehensive documentation based on data analysis',
        agent=technical_writer,
        expected_output='Professional technical report',
        context=[data_analysis_task]
    )
    
    review_task = Task(
        description='Review and validate the technical documentation for accuracy',
        agent=qa_reviewer,
        expected_output='Quality-assured final report',
        context=[data_analysis_task, documentation_task]
    )
    
    # Create crew with hierarchical process
    crew = Crew(
        agents=[data_scientist, technical_writer, qa_reviewer],
        tasks=[data_analysis_task, documentation_task, review_task],
        process=Process.sequential,
        verbose=True,
        memory=True
    )
    
    return crew.kickoff()

advanced_result = advanced_crew_with_tools()

Custom Spans for Crew Monitoring

from opentelemetry import trace
import openlit
from crewai import Agent, Task, Crew
from crewai.process import Process

openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

tracer = trace.get_tracer("crewai")

def crew_with_custom_monitoring():
    with tracer.start_as_current_span("crew-execution") as crew_span:
        crew_span.set_attribute("crew.type", "research_and_analysis")
        crew_span.set_attribute("crew.agents_count", 3)
        
        with tracer.start_as_current_span("agent-creation") as agent_span:
            # Create agents
            researcher = Agent(
                role='Lead Researcher',
                goal='Conduct thorough research on assigned topics',
                backstory='Senior researcher with expertise in multiple domains'
            )
            
            analyst = Agent(
                role='Data Analyst',
                goal='Analyze research data and identify patterns',
                backstory='Quantitative analyst with strong statistical background'
            )
            
            coordinator = Agent(
                role='Project Coordinator',
                goal='Coordinate team efforts and ensure quality deliverables',
                backstory='Experienced project manager with team leadership skills'
            )
            
            agent_span.set_attribute("agents.created", 3)
        
        with tracer.start_as_current_span("task-definition") as task_span:
            # Define tasks
            research_task = Task(
                description='Research emerging technologies in renewable energy',
                agent=researcher,
                expected_output='Comprehensive technology assessment'
            )
            
            analysis_task = Task(
                description='Analyze market potential of identified technologies',
                agent=analyst,
                expected_output='Market analysis report with projections'
            )
            
            coordination_task = Task(
                description='Coordinate findings and create executive summary',
                agent=coordinator,
                expected_output='Executive summary with recommendations'
            )
            
            task_span.set_attribute("tasks.defined", 3)
        
        with tracer.start_as_current_span("crew-assembly") as assembly_span:
            crew = Crew(
                agents=[researcher, analyst, coordinator],
                tasks=[research_task, analysis_task, coordination_task],
                process=Process.sequential,
                verbose=True
            )
            assembly_span.set_attribute("crew.process", "sequential")
        
        with tracer.start_as_current_span("crew-kickoff") as execution_span:
            execution_span.set_attribute("execution.start_time", trace.get_current_span().get_span_context().trace_id)
            
            result = crew.kickoff()
            
            execution_span.set_attribute("execution.success", True)
            execution_span.set_attribute("result.length", len(str(result)) if result else 0)
        
        crew_span.set_attribute("crew.execution_complete", True)
        return result

monitored_result = crew_with_custom_monitoring()

Hierarchical Crew with Manager Agent

import openlit
from crewai import Agent, Task, Crew
from crewai.process import Process

openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

def hierarchical_crew_example():
    # Manager agent who coordinates the team
    project_manager = Agent(
        role='Project Manager',
        goal='Coordinate team activities and ensure project success',
        backstory='Experienced manager with strong leadership and coordination skills',
        allow_delegation=True,
        verbose=True
    )
    
    # Specialized worker agents
    developer = Agent(
        role='Senior Developer',
        goal='Develop high-quality software solutions',
        backstory='Full-stack developer with 7+ years of experience',
        allow_delegation=False,
        verbose=True
    )
    
    designer = Agent(
        role='UX/UI Designer',
        goal='Create intuitive and engaging user experiences',
        backstory='Creative designer with strong user-centered design principles',
        allow_delegation=False,
        verbose=True
    )
    
    tester = Agent(
        role='QA Tester',
        goal='Ensure software quality through comprehensive testing',
        backstory='Detail-oriented tester with expertise in automation',
        allow_delegation=False,
        verbose=True
    )
    
    # High-level coordination task for manager
    project_coordination = Task(
        description="""
        Coordinate the development of a new mobile app for task management.
        Delegate specific tasks to team members and ensure quality standards.
        """,
        agent=project_manager,
        expected_output="Project coordination report with task assignments"
    )
    
    # Specific tasks for team members
    development_task = Task(
        description='Develop the core functionality for the task management app',
        agent=developer,
        expected_output='Working application with core features'
    )
    
    design_task = Task(
        description='Design user interface and user experience for the app',
        agent=designer,
        expected_output='Complete UI/UX design specifications'
    )
    
    testing_task = Task(
        description='Create and execute comprehensive test plans',
        agent=tester,
        expected_output='Test results and quality assurance report'
    )
    
    # Create hierarchical crew
    crew = Crew(
        agents=[project_manager, developer, designer, tester],
        tasks=[project_coordination, development_task, design_task, testing_task],
        process=Process.hierarchical,
        manager_llm=None,  # Uses default LLM for manager
        verbose=True
    )
    
    return crew.kickoff()

hierarchical_result = hierarchical_crew_example()

Next Steps

Verify traces: Check your Orq.ai dashboard to see incoming traces
Add custom attributes: Enhance traces with business-specific metadata
Set up alerts: Configure monitoring for performance degradation
Explore metrics: Use trace data for performance optimization

Related Documentation

Support