BeeAI

Integrate Orq.ai with BeeAI using OpenTelemetry

Getting Started

BeeAI is a framework for building intelligent agent swarms and distributed AI systems. Tracing BeeAI with Orq.ai provides comprehensive insights into agent coordination, swarm behavior, task distribution, and collective intelligence to optimize your multi-agent AI applications.

Prerequisites

Before you begin, ensure you have:

  • An Orq.ai account and API key
  • Python 3.8+ installed
  • BeeAI installed in your project
  • API keys for your LLM providers

Install Dependencies

# Core BeeAI framework
pip install beeai

# OpenTelemetry packages
pip install opentelemetry-sdk opentelemetry-exporter-otlp

# LLM providers (choose what you need)
pip install openai anthropic

# Additional utilities for distributed systems
pip install redis celery python-dotenv

Configure Orq.ai

Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:

Unix/Linux/macOS:

export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=beeai-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

Windows (PowerShell):

$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=beeai-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"

Using .env file:

OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=beeai-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>

Integrations

BeeAI supports manual OpenTelemetry integration for comprehensive swarm observability:

Manual OpenTelemetry Setup

Create comprehensive tracing configuration:

# tracing.py
import os
from opentelemetry import trace, baggage
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagate.textmap import CompositePropagator
from opentelemetry.propagate.b3 import B3MultiFormat
from opentelemetry.propagate.baggage import BaggagePropagator

# Configure resource
resource = Resource.create({
    ResourceAttributes.SERVICE_NAME: "beeai-app",
    ResourceAttributes.SERVICE_VERSION: "1.0.0",
    ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv("ENVIRONMENT", "development"),
})

# Configure exporter
exporter = OTLPSpanExporter(
    endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + "/v1/traces",
    headers={"Authorization": os.getenv("OTEL_EXPORTER_OTLP_HEADERS").split("=")[1]}
)

# Set up tracing
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)

# Configure propagation for distributed tracing
set_global_textmap(CompositePropagator([
    B3MultiFormat(),
    BaggagePropagator()
]))

# Get tracer
tracer = trace.get_tracer("beeai", "1.0.0")

Examples

Basic Agent Swarm with Tracing

import time
import uuid
from typing import List, Dict, Any
from beeai import Agent, Swarm, Task
from tracing import tracer
from opentelemetry import trace, baggage
from opentelemetry.semconv.trace import SpanAttributes

class TracedBeeAgent:
    def __init__(self, name: str, role: str, model: str = "gpt-4"):
        self.name = name
        self.role = role
        self.model = model
        self.agent_id = str(uuid.uuid4())
        
    def create_agent(self, capabilities: List[str] = None):
        with tracer.start_as_current_span(
            "beeai.agent.create",
            attributes={
                "agent.name": self.name,
                "agent.role": self.role,
                "agent.model": self.model,
                "agent.id": self.agent_id,
                "agent.capabilities.count": len(capabilities) if capabilities else 0,
            }
        ) as span:
            agent = Agent(
                name=self.name,
                role=self.role,
                model=self.model,
                capabilities=capabilities or []
            )
            
            span.set_attributes({
                "agent.capabilities": capabilities or [],
                "agent.created_at": time.time(),
            })
            
            return agent
    
    def execute_task(self, agent: Agent, task: Task, context: Dict[str, Any] = None):
        # Set baggage for distributed tracing
        ctx = baggage.set_baggage("agent.name", self.name)
        ctx = baggage.set_baggage("agent.id", self.agent_id)
        
        with tracer.start_as_current_span(
            "beeai.agent.execute_task",
            context=ctx,
            attributes={
                "agent.name": self.name,
                "agent.role": self.role,
                "task.id": getattr(task, 'id', 'unknown'),
                "task.type": getattr(task, 'type', 'general'),
                "task.priority": getattr(task, 'priority', 'normal'),
            }
        ) as span:
            try:
                start_time = time.time()
                result = agent.execute(task, context=context)
                duration = time.time() - start_time
                
                span.set_attributes({
                    "agent.execution_time": duration,
                    "agent.result.type": type(result).__name__,
                    "agent.status": "completed",
                    "task.completed": True,
                })
                
                return result
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.StatusCode.ERROR, str(e))
                span.set_attributes({
                    "agent.status": "failed",
                    "task.completed": False,
                })
                raise

class TracedSwarm:
    def __init__(self, name: str, coordination_strategy: str = "collaborative"):
        self.name = name
        self.coordination_strategy = coordination_strategy
        self.agents: List[TracedBeeAgent] = []
        self.swarm_id = str(uuid.uuid4())
        
    def add_agent(self, agent: TracedBeeAgent):
        with tracer.start_as_current_span(
            "beeai.swarm.add_agent",
            attributes={
                "swarm.name": self.name,
                "swarm.id": self.swarm_id,
                "agent.name": agent.name,
                "agent.role": agent.role,
            }
        ):
            self.agents.append(agent)
            return self
    
    def distribute_task(self, task: Task, distribution_strategy: str = "best_fit"):
        """Distribute a task across the swarm"""
        with tracer.start_as_current_span(
            "beeai.swarm.distribute_task",
            attributes={
                "swarm.name": self.name,
                "swarm.agents.count": len(self.agents),
                "task.distribution_strategy": distribution_strategy,
                "task.type": getattr(task, 'type', 'general'),
            }
        ) as span:
            try:
                if distribution_strategy == "best_fit":
                    # Find the best agent for the task
                    selected_agent = self._select_best_agent_for_task(task)
                    
                    span.set_attributes({
                        "task.assigned_to": selected_agent.name,
                        "task.assignment_method": "best_fit",
                    })
                    
                    result = selected_agent.execute_task(selected_agent.create_agent(), task)
                    
                elif distribution_strategy == "parallel":
                    # Distribute to multiple agents in parallel
                    results = []
                    for agent in self.agents:
                        with tracer.start_as_current_span(
                            f"beeai.swarm.parallel_execution.{agent.name}",
                            attributes={
                                "agent.name": agent.name,
                                "agent.role": agent.role,
                            }
                        ) as agent_span:
                            agent_result = agent.execute_task(agent.create_agent(), task)
                            results.append({
                                "agent": agent.name,
                                "result": agent_result
                            })
                            
                            agent_span.set_attributes({
                                "agent.result.success": True,
                                "agent.result.type": type(agent_result).__name__,
                            })
                    
                    result = self._merge_parallel_results(results)
                    
                    span.set_attributes({
                        "task.parallel_agents": len(results),
                        "task.assignment_method": "parallel",
                    })
                
                span.set_attributes({
                    "swarm.execution.success": True,
                    "swarm.result.type": type(result).__name__,
                })
                
                return result
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.StatusCode.ERROR, str(e))
                raise
    
    def _select_best_agent_for_task(self, task: Task) -> TracedBeeAgent:
        """Select the best agent for a given task based on capabilities"""
        # Simple heuristic: match task type with agent role
        task_type = getattr(task, 'type', 'general')
        
        for agent in self.agents:
            if task_type.lower() in agent.role.lower():
                return agent
        
        # Fallback to first agent
        return self.agents[0] if self.agents else None
    
    def _merge_parallel_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Merge results from parallel execution"""
        return {
            "parallel_results": results,
            "agent_count": len(results),
            "consensus": self._calculate_consensus(results)
        }
    
    def _calculate_consensus(self, results: List[Dict[str, Any]]) -> str:
        """Calculate consensus from multiple agent results"""
        # Simple consensus: most common result type
        result_types = [type(r["result"]).__name__ for r in results]
        from collections import Counter
        most_common = Counter(result_types).most_common(1)
        return most_common[0][0] if most_common else "no_consensus"

# Example: Research Swarm
def research_swarm_example():
    # Create specialized agents
    researcher_agent = TracedBeeAgent(
        name="researcher",
        role="research_specialist",
        model="gpt-4"
    )
    
    analyst_agent = TracedBeeAgent(
        name="analyst", 
        role="data_analyst",
        model="gpt-4"
    )
    
    writer_agent = TracedBeeAgent(
        name="writer",
        role="content_writer", 
        model="gpt-4"
    )
    
    # Create swarm
    research_swarm = TracedSwarm("research_team", "collaborative")
    research_swarm.add_agent(researcher_agent)
    research_swarm.add_agent(analyst_agent)
    research_swarm.add_agent(writer_agent)
    
    # Create tasks
    research_task = Task(
        id="research_001",
        type="research",
        description="Research the latest trends in renewable energy",
        priority="high"
    )
    
    analysis_task = Task(
        id="analysis_001", 
        type="analysis",
        description="Analyze renewable energy market data",
        priority="medium"
    )
    
    # Execute tasks with different strategies
    print("Executing research task with best-fit strategy...")
    research_result = research_swarm.distribute_task(research_task, "best_fit")
    print(f"Research result: {research_result}")
    
    print("\nExecuting analysis task with parallel strategy...")
    analysis_result = research_swarm.distribute_task(analysis_task, "parallel")
    print(f"Analysis result: {analysis_result}")
    
    return research_result, analysis_result

# Run example
research_swarm_example()

Hierarchical Swarm Organization

from beeai import Agent, Swarm, Task, HierarchicalSwarm
from tracing import tracer
from opentelemetry import trace
import time
from typing import List, Dict, Any

class TracedHierarchicalSwarm:
    def __init__(self, name: str):
        self.name = name
        self.coordinator_agents: List[TracedBeeAgent] = []
        self.worker_swarms: Dict[str, TracedSwarm] = {}
        self.hierarchy_id = str(uuid.uuid4())
        
    def add_coordinator(self, agent: TracedBeeAgent):
        """Add a coordinator agent to manage worker swarms"""
        with tracer.start_as_current_span(
            "beeai.hierarchical.add_coordinator",
            attributes={
                "hierarchy.name": self.name,
                "hierarchy.id": self.hierarchy_id,
                "coordinator.name": agent.name,
                "coordinator.role": agent.role,
            }
        ):
            self.coordinator_agents.append(agent)
            return self
    
    def add_worker_swarm(self, swarm_name: str, swarm: TracedSwarm):
        """Add a worker swarm to the hierarchy"""
        with tracer.start_as_current_span(
            "beeai.hierarchical.add_worker_swarm",
            attributes={
                "hierarchy.name": self.name,
                "worker_swarm.name": swarm_name,
                "worker_swarm.agents.count": len(swarm.agents),
            }
        ):
            self.worker_swarms[swarm_name] = swarm
            return self
    
    def execute_complex_task(self, task: Task):
        """Execute a complex task using hierarchical coordination"""
        with tracer.start_as_current_span(
            "beeai.hierarchical.execute_complex_task",
            attributes={
                "hierarchy.name": self.name,
                "task.id": getattr(task, 'id', 'unknown'),
                "task.complexity": "complex",
                "coordinators.count": len(self.coordinator_agents),
                "worker_swarms.count": len(self.worker_swarms),
            }
        ) as span:
            try:
                # Phase 1: Task decomposition by coordinator
                subtasks = self._decompose_task(task)
                
                span.set_attributes({
                    "task.subtasks.count": len(subtasks),
                    "task.decomposition.success": True,
                })
                
                # Phase 2: Distribute subtasks to worker swarms
                swarm_results = {}
                for subtask in subtasks:
                    swarm_name = self._assign_subtask_to_swarm(subtask)
                    if swarm_name in self.worker_swarms:
                        with tracer.start_as_current_span(
                            f"beeai.hierarchical.subtask.{subtask['id']}",
                            attributes={
                                "subtask.id": subtask["id"],
                                "subtask.assigned_to": swarm_name,
                                "subtask.type": subtask.get("type", "general"),
                            }
                        ) as subtask_span:
                            swarm = self.worker_swarms[swarm_name]
                            subtask_obj = Task(
                                id=subtask["id"],
                                type=subtask.get("type", "general"),
                                description=subtask["description"]
                            )
                            
                            result = swarm.distribute_task(subtask_obj)
                            swarm_results[subtask["id"]] = {
                                "swarm": swarm_name,
                                "result": result,
                                "subtask": subtask
                            }
                            
                            subtask_span.set_attributes({
                                "subtask.execution.success": True,
                                "subtask.result.type": type(result).__name__,
                            })
                
                # Phase 3: Coordinate and synthesize results
                final_result = self._synthesize_results(swarm_results)
                
                span.set_attributes({
                    "hierarchy.execution.success": True,
                    "hierarchy.subtasks.completed": len(swarm_results),
                    "hierarchy.final_result.type": type(final_result).__name__,
                })
                
                return final_result
                
            except Exception as e:
                span.record_exception(e)
                span.set_status(trace.StatusCode.ERROR, str(e))
                raise
    
    def _decompose_task(self, task: Task) -> List[Dict[str, Any]]:
        """Decompose a complex task into subtasks"""
        with tracer.start_as_current_span(
            "beeai.hierarchical.decompose_task",
            attributes={
                "task.id": getattr(task, 'id', 'unknown'),
                "coordinators.available": len(self.coordinator_agents),
            }
        ) as span:
            # Use coordinator agent to decompose the task
            if self.coordinator_agents:
                coordinator = self.coordinator_agents[0]
                
                # Mock decomposition - in real implementation, use the coordinator agent
                subtasks = [
                    {
                        "id": f"{task.id}_subtask_1",
                        "type": "research",
                        "description": f"Research aspect of: {task.description}"
                    },
                    {
                        "id": f"{task.id}_subtask_2", 
                        "type": "analysis",
                        "description": f"Analyze data for: {task.description}"
                    },
                    {
                        "id": f"{task.id}_subtask_3",
                        "type": "synthesis", 
                        "description": f"Synthesize findings for: {task.description}"
                    }
                ]
                
                span.set_attributes({
                    "decomposition.subtasks_created": len(subtasks),
                    "decomposition.coordinator": coordinator.name,
                })
                
                return subtasks
            
            return []
    
    def _assign_subtask_to_swarm(self, subtask: Dict[str, Any]) -> str:
        """Assign a subtask to the most appropriate worker swarm"""
        subtask_type = subtask.get("type", "general")
        
        # Simple assignment based on swarm names containing task type
        for swarm_name in self.worker_swarms:
            if subtask_type.lower() in swarm_name.lower():
                return swarm_name
        
        # Fallback to first available swarm
        return list(self.worker_swarms.keys())[0] if self.worker_swarms else None
    
    def _synthesize_results(self, swarm_results: Dict[str, Any]) -> Dict[str, Any]:
        """Synthesize results from all worker swarms"""
        with tracer.start_as_current_span(
            "beeai.hierarchical.synthesize_results",
            attributes={
                "synthesis.input_count": len(swarm_results),
                "synthesis.swarms_involved": list(set(r["swarm"] for r in swarm_results.values())),
            }
        ) as span:
            synthesis = {
                "task_completed": True,
                "subtask_results": swarm_results,
                "summary": f"Successfully completed {len(swarm_results)} subtasks",
                "swarms_utilized": list(set(r["swarm"] for r in swarm_results.values())),
                "synthesis_timestamp": time.time()
            }
            
            span.set_attributes({
                "synthesis.success": True,
                "synthesis.subtasks_processed": len(swarm_results),
                "synthesis.swarms_count": len(synthesis["swarms_utilized"]),
            })
            
            return synthesis

# Example: Hierarchical Research Organization
def hierarchical_swarm_example():
    # Create coordinator
    coordinator_agent = TracedBeeAgent(
        name="project_coordinator",
        role="project_manager",
        model="gpt-4"
    )
    
    # Create specialized worker swarms
    # Research swarm
    research_swarm = TracedSwarm("research_specialists", "collaborative")
    research_swarm.add_agent(TracedBeeAgent("senior_researcher", "research_lead", "gpt-4"))
    research_swarm.add_agent(TracedBeeAgent("junior_researcher", "research_assistant", "gpt-3.5"))
    
    # Analysis swarm  
    analysis_swarm = TracedSwarm("data_analysts", "parallel")
    analysis_swarm.add_agent(TracedBeeAgent("statistical_analyst", "statistics_expert", "gpt-4"))
    analysis_swarm.add_agent(TracedBeeAgent("ml_analyst", "machine_learning_expert", "gpt-4"))
    
    # Synthesis swarm
    synthesis_swarm = TracedSwarm("content_creators", "collaborative") 
    synthesis_swarm.add_agent(TracedBeeAgent("technical_writer", "documentation_specialist", "gpt-4"))
    synthesis_swarm.add_agent(TracedBeeAgent("editor", "content_editor", "gpt-4"))
    
    # Create hierarchical organization
    hierarchy = TracedHierarchicalSwarm("research_organization")
    hierarchy.add_coordinator(coordinator_agent)
    hierarchy.add_worker_swarm("research_specialists", research_swarm)
    hierarchy.add_worker_swarm("data_analysts", analysis_swarm)
    hierarchy.add_worker_swarm("content_creators", synthesis_swarm)
    
    # Execute complex task
    complex_task = Task(
        id="complex_research_001",
        type="complex_research",
        description="Comprehensive analysis of AI impact on healthcare industry",
        priority="critical"
    )
    
    print("Executing complex task with hierarchical coordination...")
    result = hierarchy.execute_complex_task(complex_task)
    
    print(f"Hierarchical execution completed!")
    print(f"Subtasks completed: {result['subtask_results']}")
    print(f"Swarms utilized: {result['swarms_utilized']}")
    
    return result

# Run hierarchical example
hierarchical_swarm_example()

Swarm Intelligence and Emergent Behavior

from beeai import Agent, Swarm, Task
from tracing import tracer
from opentelemetry import trace
import time
import random
from typing import List, Dict, Any

class IntelligentSwarm:
    def __init__(self, name: str, intelligence_mode: str = "collective"):
        self.name = name
        self.intelligence_mode = intelligence_mode
        self.agents: List[TracedBeeAgent] = []
        self.shared_knowledge: Dict[str, Any] = {}
        self.swarm_memory: List[Dict[str, Any]] = []
        
    def add_intelligent_agent(self, agent: TracedBeeAgent, learning_rate: float = 0.1):
        """Add an agent with learning capabilities to the swarm"""
        with tracer.start_as_current_span(
            "beeai.intelligent_swarm.add_agent",
            attributes={
                "swarm.name": self.name,
                "agent.name": agent.name,
                "agent.learning_rate": learning_rate,
                "swarm.intelligence_mode": self.intelligence_mode,
            }
        ):
            agent.learning_rate = learning_rate
            agent.experience = []
            self.agents.append(agent)
            return self
    
    def emergent_problem_solving(self, problem: Task, iterations: int = 3):
        """Solve problems through emergent swarm intelligence"""
        with tracer.start_as_current_span(
            "beeai.intelligent_swarm.emergent_solving",
            attributes={
                "swarm.name": self.name,
                "problem.id": getattr(problem, 'id', 'unknown'),
                "swarm.agents.count": len(self.agents),
                "solving.iterations": iterations,
                "solving.mode": self.intelligence_mode,
            }
        ) as span:
            solutions = []
            collective_intelligence = 0.0
            
            for iteration in range(iterations):
                with tracer.start_as_current_span(
                    f"beeai.swarm.iteration.{iteration + 1}",
                    attributes={
                        "iteration.number": iteration + 1,
                        "iteration.total": iterations,
                        "swarm.collective_intelligence": collective_intelligence,
                    }
                ) as iteration_span:
                    
                    iteration_solutions = []
                    
                    # Each agent attempts to solve the problem
                    for agent in self.agents:
                        with tracer.start_as_current_span(
                            f"beeai.agent.problem_solving.{agent.name}",
                            attributes={
                                "agent.name": agent.name,
                                "agent.experience.count": len(getattr(agent, 'experience', [])),
                            }
                        ) as agent_span:
                            # Agent uses shared knowledge and personal experience
                            solution = self._agent_solve_with_intelligence(
                                agent, problem, collective_intelligence
                            )
                            
                            iteration_solutions.append({
                                "agent": agent.name,
                                "solution": solution,
                                "confidence": self._calculate_confidence(agent, solution)
                            })
                            
                            agent_span.set_attributes({
                                "agent.solution.confidence": iteration_solutions[-1]["confidence"],
                                "agent.solution.type": type(solution).__name__,
                            })
                    
                    # Update shared knowledge through swarm communication
                    self._update_shared_knowledge(iteration_solutions)
                    
                    # Calculate emergent collective intelligence
                    collective_intelligence = self._calculate_collective_intelligence(
                        iteration_solutions
                    )
                    
                    # Learn from iteration
                    self._swarm_learning(iteration_solutions, collective_intelligence)
                    
                    solutions.extend(iteration_solutions)
                    
                    iteration_span.set_attributes({
                        "iteration.solutions_generated": len(iteration_solutions),
                        "iteration.collective_intelligence": collective_intelligence,
                        "iteration.shared_knowledge_size": len(self.shared_knowledge),
                    })
            
            # Synthesize final solution through collective intelligence
            final_solution = self._synthesize_collective_solution(solutions)
            
            span.set_attributes({
                "swarm.final_collective_intelligence": collective_intelligence,
                "swarm.total_solutions_generated": len(solutions),
                "swarm.final_solution.confidence": final_solution.get("confidence", 0.0),
                "swarm.learning.memory_size": len(self.swarm_memory),
            })
            
            return final_solution
    
    def _agent_solve_with_intelligence(self, agent: TracedBeeAgent, problem: Task, collective_intelligence: float):
        """Agent solves problem using individual and collective intelligence"""
        # Simulate intelligent problem solving
        base_solution = f"Solution by {agent.name}: {problem.description}"
        
        # Apply collective intelligence boost
        intelligence_boost = 1.0 + (collective_intelligence * 0.5)
        
        # Add learning from experience
        if hasattr(agent, 'experience') and agent.experience:
            experience_boost = len(agent.experience) * 0.1
            intelligence_boost += experience_boost
        
        enhanced_solution = {
            "base_solution": base_solution,
            "intelligence_boost": intelligence_boost,
            "collective_influence": collective_intelligence,
            "agent_experience": len(getattr(agent, 'experience', [])),
        }
        
        return enhanced_solution
    
    def _calculate_confidence(self, agent: TracedBeeAgent, solution: Dict[str, Any]) -> float:
        """Calculate agent's confidence in the solution"""
        base_confidence = 0.5
        
        # Boost confidence based on experience
        experience_boost = len(getattr(agent, 'experience', [])) * 0.05
        
        # Boost confidence based on intelligence boost
        intelligence_boost = solution.get("intelligence_boost", 1.0) * 0.1
        
        confidence = min(1.0, base_confidence + experience_boost + intelligence_boost)
        return confidence
    
    def _update_shared_knowledge(self, iteration_solutions: List[Dict[str, Any]]):
        """Update swarm's shared knowledge base"""
        with tracer.start_as_current_span(
            "beeai.swarm.update_shared_knowledge",
            attributes={
                "knowledge.solutions_to_process": len(iteration_solutions),
                "knowledge.current_size": len(self.shared_knowledge),
            }
        ) as span:
            for solution_data in iteration_solutions:
                agent_name = solution_data["agent"]
                solution = solution_data["solution"]
                confidence = solution_data["confidence"]
                
                # Add high-confidence solutions to shared knowledge
                if confidence > 0.7:
                    key = f"high_confidence_from_{agent_name}"
                    self.shared_knowledge[key] = {
                        "solution": solution,
                        "confidence": confidence,
                        "timestamp": time.time()
                    }
            
            span.set_attributes({
                "knowledge.updated_size": len(self.shared_knowledge),
                "knowledge.high_confidence_additions": len([s for s in iteration_solutions if s["confidence"] > 0.7]),
            })
    
    def _calculate_collective_intelligence(self, iteration_solutions: List[Dict[str, Any]]) -> float:
        """Calculate emergent collective intelligence of the swarm"""
        if not iteration_solutions:
            return 0.0
        
        # Average confidence across all agents
        avg_confidence = sum(s["confidence"] for s in iteration_solutions) / len(iteration_solutions)
        
        # Diversity bonus (different solution approaches)
        solution_types = set(type(s["solution"]).__name__ for s in iteration_solutions)
        diversity_bonus = len(solution_types) * 0.1
        
        # Shared knowledge bonus
        knowledge_bonus = len(self.shared_knowledge) * 0.05
        
        collective_intelligence = min(1.0, avg_confidence + diversity_bonus + knowledge_bonus)
        return collective_intelligence
    
    def _swarm_learning(self, iteration_solutions: List[Dict[str, Any]], collective_intelligence: float):
        """Enable swarm-wide learning from the iteration"""
        with tracer.start_as_current_span(
            "beeai.swarm.learning",
            attributes={
                "learning.collective_intelligence": collective_intelligence,
                "learning.solutions_count": len(iteration_solutions),
            }
        ) as span:
            # Update each agent's experience
            for agent in self.agents:
                if hasattr(agent, 'experience'):
                    agent.experience.append({
                        "iteration_intelligence": collective_intelligence,
                        "solutions_observed": len(iteration_solutions),
                        "timestamp": time.time()
                    })
                    
                    # Limit experience memory
                    if len(agent.experience) > 10:
                        agent.experience = agent.experience[-10:]
            
            # Update swarm memory
            self.swarm_memory.append({
                "collective_intelligence": collective_intelligence,
                "solutions_generated": len(iteration_solutions),
                "timestamp": time.time()
            })
            
            # Limit swarm memory
            if len(self.swarm_memory) > 20:
                self.swarm_memory = self.swarm_memory[-20:]
            
            span.set_attributes({
                "learning.agents_updated": len(self.agents),
                "learning.swarm_memory_size": len(self.swarm_memory),
            })
    
    def _synthesize_collective_solution(self, all_solutions: List[Dict[str, Any]]) -> Dict[str, Any]:
        """Synthesize the best collective solution from all iterations"""
        with tracer.start_as_current_span(
            "beeai.swarm.synthesize_collective_solution",
            attributes={
                "synthesis.total_solutions": len(all_solutions),
            }
        ) as span:
            if not all_solutions:
                return {"error": "No solutions generated"}
            
            # Find highest confidence solutions
            high_confidence_solutions = [s for s in all_solutions if s["confidence"] > 0.8]
            
            if not high_confidence_solutions:
                high_confidence_solutions = sorted(all_solutions, key=lambda x: x["confidence"], reverse=True)[:3]
            
            # Create collective solution
            collective_solution = {
                "collective_solution": "Synthesized from swarm intelligence",
                "contributing_agents": list(set(s["agent"] for s in high_confidence_solutions)),
                "confidence": sum(s["confidence"] for s in high_confidence_solutions) / len(high_confidence_solutions),
                "solutions_considered": len(all_solutions),
                "high_confidence_count": len(high_confidence_solutions),
                "swarm_intelligence_level": self._calculate_collective_intelligence(all_solutions),
                "synthesis_timestamp": time.time()
            }
            
            span.set_attributes({
                "synthesis.high_confidence_solutions": len(high_confidence_solutions),
                "synthesis.contributing_agents": len(collective_solution["contributing_agents"]),
                "synthesis.final_confidence": collective_solution["confidence"],
                "synthesis.intelligence_level": collective_solution["swarm_intelligence_level"],
            })
            
            return collective_solution

# Example: Intelligent Problem-Solving Swarm
def intelligent_swarm_example():
    # Create intelligent swarm
    swarm = IntelligentSwarm("problem_solving_collective", "collective")
    
    # Add diverse intelligent agents
    swarm.add_intelligent_agent(
        TracedBeeAgent("analytical_thinker", "analyst", "gpt-4"), 
        learning_rate=0.15
    )
    swarm.add_intelligent_agent(
        TracedBeeAgent("creative_solver", "creative", "gpt-4"),
        learning_rate=0.12
    )
    swarm.add_intelligent_agent(
        TracedBeeAgent("practical_implementer", "implementer", "gpt-4"),
        learning_rate=0.10
    )
    swarm.add_intelligent_agent(
        TracedBeeAgent("systems_thinker", "systems_analyst", "gpt-4"),
        learning_rate=0.13
    )
    
    # Complex problem requiring collective intelligence
    complex_problem = Task(
        id="complex_001",
        type="optimization",
        description="Optimize urban traffic flow while reducing emissions and improving safety",
        priority="critical"
    )
    
    print("Solving complex problem through swarm intelligence...")
    solution = swarm.emergent_problem_solving(complex_problem, iterations=4)
    
    print(f"\nCollective Solution Generated!")
    print(f"Contributing agents: {solution['contributing_agents']}")
    print(f"Final confidence: {solution['confidence']:.2f}")
    print(f"Swarm intelligence level: {solution['swarm_intelligence_level']:.2f}")
    print(f"Solutions considered: {solution['solutions_considered']}")
    
    return solution

# Run intelligent swarm example
intelligent_swarm_example()

Next Steps

Verify traces: Check your Orq.ai dashboard to see incoming traces
Add custom attributes: Enhance traces with swarm-specific metadata
Set up alerts: Configure monitoring for swarm coordination failures
Explore metrics: Use trace data for swarm optimization
Monitor collective intelligence: Track emergent behavior patterns

Related Documentation

Support