BeeAI
Integrate Orq.ai with BeeAI using OpenTelemetry
Getting Started
BeeAI is a framework for building intelligent agent swarms and distributed AI systems. Tracing BeeAI with Orq.ai provides comprehensive insights into agent coordination, swarm behavior, task distribution, and collective intelligence to optimize your multi-agent AI applications.
Prerequisites
Before you begin, ensure you have:
- An Orq.ai account and API key
- Python 3.8+ installed
- BeeAI installed in your project
- API keys for your LLM providers
Install Dependencies
# Core BeeAI framework
pip install beeai
# OpenTelemetry packages
pip install opentelemetry-sdk opentelemetry-exporter-otlp
# LLM providers (choose what you need)
pip install openai anthropic
# Additional utilities for distributed systems
pip install redis celery python-dotenv
Configure Orq.ai
Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:
Unix/Linux/macOS:
export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=beeai-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
Windows (PowerShell):
$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=beeai-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"
Using .env file:
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=beeai-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>
Integrations
BeeAI supports manual OpenTelemetry integration for comprehensive swarm observability:
Manual OpenTelemetry Setup
Create comprehensive tracing configuration:
# tracing.py
import os
from opentelemetry import trace, baggage
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.propagate import set_global_textmap
from opentelemetry.propagate.textmap import CompositePropagator
from opentelemetry.propagate.b3 import B3MultiFormat
from opentelemetry.propagate.baggage import BaggagePropagator
# Configure resource
resource = Resource.create({
ResourceAttributes.SERVICE_NAME: "beeai-app",
ResourceAttributes.SERVICE_VERSION: "1.0.0",
ResourceAttributes.DEPLOYMENT_ENVIRONMENT: os.getenv("ENVIRONMENT", "development"),
})
# Configure exporter
exporter = OTLPSpanExporter(
endpoint=os.getenv("OTEL_EXPORTER_OTLP_ENDPOINT") + "/v1/traces",
headers={"Authorization": os.getenv("OTEL_EXPORTER_OTLP_HEADERS").split("=")[1]}
)
# Set up tracing
provider = TracerProvider(resource=resource)
processor = BatchSpanProcessor(exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Configure propagation for distributed tracing
set_global_textmap(CompositePropagator([
B3MultiFormat(),
BaggagePropagator()
]))
# Get tracer
tracer = trace.get_tracer("beeai", "1.0.0")
Examples
Basic Agent Swarm with Tracing
import time
import uuid
from typing import List, Dict, Any
from beeai import Agent, Swarm, Task
from tracing import tracer
from opentelemetry import trace, baggage
from opentelemetry.semconv.trace import SpanAttributes
class TracedBeeAgent:
def __init__(self, name: str, role: str, model: str = "gpt-4"):
self.name = name
self.role = role
self.model = model
self.agent_id = str(uuid.uuid4())
def create_agent(self, capabilities: List[str] = None):
with tracer.start_as_current_span(
"beeai.agent.create",
attributes={
"agent.name": self.name,
"agent.role": self.role,
"agent.model": self.model,
"agent.id": self.agent_id,
"agent.capabilities.count": len(capabilities) if capabilities else 0,
}
) as span:
agent = Agent(
name=self.name,
role=self.role,
model=self.model,
capabilities=capabilities or []
)
span.set_attributes({
"agent.capabilities": capabilities or [],
"agent.created_at": time.time(),
})
return agent
def execute_task(self, agent: Agent, task: Task, context: Dict[str, Any] = None):
# Set baggage for distributed tracing
ctx = baggage.set_baggage("agent.name", self.name)
ctx = baggage.set_baggage("agent.id", self.agent_id)
with tracer.start_as_current_span(
"beeai.agent.execute_task",
context=ctx,
attributes={
"agent.name": self.name,
"agent.role": self.role,
"task.id": getattr(task, 'id', 'unknown'),
"task.type": getattr(task, 'type', 'general'),
"task.priority": getattr(task, 'priority', 'normal'),
}
) as span:
try:
start_time = time.time()
result = agent.execute(task, context=context)
duration = time.time() - start_time
span.set_attributes({
"agent.execution_time": duration,
"agent.result.type": type(result).__name__,
"agent.status": "completed",
"task.completed": True,
})
return result
except Exception as e:
span.record_exception(e)
span.set_status(trace.StatusCode.ERROR, str(e))
span.set_attributes({
"agent.status": "failed",
"task.completed": False,
})
raise
class TracedSwarm:
def __init__(self, name: str, coordination_strategy: str = "collaborative"):
self.name = name
self.coordination_strategy = coordination_strategy
self.agents: List[TracedBeeAgent] = []
self.swarm_id = str(uuid.uuid4())
def add_agent(self, agent: TracedBeeAgent):
with tracer.start_as_current_span(
"beeai.swarm.add_agent",
attributes={
"swarm.name": self.name,
"swarm.id": self.swarm_id,
"agent.name": agent.name,
"agent.role": agent.role,
}
):
self.agents.append(agent)
return self
def distribute_task(self, task: Task, distribution_strategy: str = "best_fit"):
"""Distribute a task across the swarm"""
with tracer.start_as_current_span(
"beeai.swarm.distribute_task",
attributes={
"swarm.name": self.name,
"swarm.agents.count": len(self.agents),
"task.distribution_strategy": distribution_strategy,
"task.type": getattr(task, 'type', 'general'),
}
) as span:
try:
if distribution_strategy == "best_fit":
# Find the best agent for the task
selected_agent = self._select_best_agent_for_task(task)
span.set_attributes({
"task.assigned_to": selected_agent.name,
"task.assignment_method": "best_fit",
})
result = selected_agent.execute_task(selected_agent.create_agent(), task)
elif distribution_strategy == "parallel":
# Distribute to multiple agents in parallel
results = []
for agent in self.agents:
with tracer.start_as_current_span(
f"beeai.swarm.parallel_execution.{agent.name}",
attributes={
"agent.name": agent.name,
"agent.role": agent.role,
}
) as agent_span:
agent_result = agent.execute_task(agent.create_agent(), task)
results.append({
"agent": agent.name,
"result": agent_result
})
agent_span.set_attributes({
"agent.result.success": True,
"agent.result.type": type(agent_result).__name__,
})
result = self._merge_parallel_results(results)
span.set_attributes({
"task.parallel_agents": len(results),
"task.assignment_method": "parallel",
})
span.set_attributes({
"swarm.execution.success": True,
"swarm.result.type": type(result).__name__,
})
return result
except Exception as e:
span.record_exception(e)
span.set_status(trace.StatusCode.ERROR, str(e))
raise
def _select_best_agent_for_task(self, task: Task) -> TracedBeeAgent:
"""Select the best agent for a given task based on capabilities"""
# Simple heuristic: match task type with agent role
task_type = getattr(task, 'type', 'general')
for agent in self.agents:
if task_type.lower() in agent.role.lower():
return agent
# Fallback to first agent
return self.agents[0] if self.agents else None
def _merge_parallel_results(self, results: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Merge results from parallel execution"""
return {
"parallel_results": results,
"agent_count": len(results),
"consensus": self._calculate_consensus(results)
}
def _calculate_consensus(self, results: List[Dict[str, Any]]) -> str:
"""Calculate consensus from multiple agent results"""
# Simple consensus: most common result type
result_types = [type(r["result"]).__name__ for r in results]
from collections import Counter
most_common = Counter(result_types).most_common(1)
return most_common[0][0] if most_common else "no_consensus"
# Example: Research Swarm
def research_swarm_example():
# Create specialized agents
researcher_agent = TracedBeeAgent(
name="researcher",
role="research_specialist",
model="gpt-4"
)
analyst_agent = TracedBeeAgent(
name="analyst",
role="data_analyst",
model="gpt-4"
)
writer_agent = TracedBeeAgent(
name="writer",
role="content_writer",
model="gpt-4"
)
# Create swarm
research_swarm = TracedSwarm("research_team", "collaborative")
research_swarm.add_agent(researcher_agent)
research_swarm.add_agent(analyst_agent)
research_swarm.add_agent(writer_agent)
# Create tasks
research_task = Task(
id="research_001",
type="research",
description="Research the latest trends in renewable energy",
priority="high"
)
analysis_task = Task(
id="analysis_001",
type="analysis",
description="Analyze renewable energy market data",
priority="medium"
)
# Execute tasks with different strategies
print("Executing research task with best-fit strategy...")
research_result = research_swarm.distribute_task(research_task, "best_fit")
print(f"Research result: {research_result}")
print("\nExecuting analysis task with parallel strategy...")
analysis_result = research_swarm.distribute_task(analysis_task, "parallel")
print(f"Analysis result: {analysis_result}")
return research_result, analysis_result
# Run example
research_swarm_example()
Hierarchical Swarm Organization
from beeai import Agent, Swarm, Task, HierarchicalSwarm
from tracing import tracer
from opentelemetry import trace
import time
from typing import List, Dict, Any
class TracedHierarchicalSwarm:
def __init__(self, name: str):
self.name = name
self.coordinator_agents: List[TracedBeeAgent] = []
self.worker_swarms: Dict[str, TracedSwarm] = {}
self.hierarchy_id = str(uuid.uuid4())
def add_coordinator(self, agent: TracedBeeAgent):
"""Add a coordinator agent to manage worker swarms"""
with tracer.start_as_current_span(
"beeai.hierarchical.add_coordinator",
attributes={
"hierarchy.name": self.name,
"hierarchy.id": self.hierarchy_id,
"coordinator.name": agent.name,
"coordinator.role": agent.role,
}
):
self.coordinator_agents.append(agent)
return self
def add_worker_swarm(self, swarm_name: str, swarm: TracedSwarm):
"""Add a worker swarm to the hierarchy"""
with tracer.start_as_current_span(
"beeai.hierarchical.add_worker_swarm",
attributes={
"hierarchy.name": self.name,
"worker_swarm.name": swarm_name,
"worker_swarm.agents.count": len(swarm.agents),
}
):
self.worker_swarms[swarm_name] = swarm
return self
def execute_complex_task(self, task: Task):
"""Execute a complex task using hierarchical coordination"""
with tracer.start_as_current_span(
"beeai.hierarchical.execute_complex_task",
attributes={
"hierarchy.name": self.name,
"task.id": getattr(task, 'id', 'unknown'),
"task.complexity": "complex",
"coordinators.count": len(self.coordinator_agents),
"worker_swarms.count": len(self.worker_swarms),
}
) as span:
try:
# Phase 1: Task decomposition by coordinator
subtasks = self._decompose_task(task)
span.set_attributes({
"task.subtasks.count": len(subtasks),
"task.decomposition.success": True,
})
# Phase 2: Distribute subtasks to worker swarms
swarm_results = {}
for subtask in subtasks:
swarm_name = self._assign_subtask_to_swarm(subtask)
if swarm_name in self.worker_swarms:
with tracer.start_as_current_span(
f"beeai.hierarchical.subtask.{subtask['id']}",
attributes={
"subtask.id": subtask["id"],
"subtask.assigned_to": swarm_name,
"subtask.type": subtask.get("type", "general"),
}
) as subtask_span:
swarm = self.worker_swarms[swarm_name]
subtask_obj = Task(
id=subtask["id"],
type=subtask.get("type", "general"),
description=subtask["description"]
)
result = swarm.distribute_task(subtask_obj)
swarm_results[subtask["id"]] = {
"swarm": swarm_name,
"result": result,
"subtask": subtask
}
subtask_span.set_attributes({
"subtask.execution.success": True,
"subtask.result.type": type(result).__name__,
})
# Phase 3: Coordinate and synthesize results
final_result = self._synthesize_results(swarm_results)
span.set_attributes({
"hierarchy.execution.success": True,
"hierarchy.subtasks.completed": len(swarm_results),
"hierarchy.final_result.type": type(final_result).__name__,
})
return final_result
except Exception as e:
span.record_exception(e)
span.set_status(trace.StatusCode.ERROR, str(e))
raise
def _decompose_task(self, task: Task) -> List[Dict[str, Any]]:
"""Decompose a complex task into subtasks"""
with tracer.start_as_current_span(
"beeai.hierarchical.decompose_task",
attributes={
"task.id": getattr(task, 'id', 'unknown'),
"coordinators.available": len(self.coordinator_agents),
}
) as span:
# Use coordinator agent to decompose the task
if self.coordinator_agents:
coordinator = self.coordinator_agents[0]
# Mock decomposition - in real implementation, use the coordinator agent
subtasks = [
{
"id": f"{task.id}_subtask_1",
"type": "research",
"description": f"Research aspect of: {task.description}"
},
{
"id": f"{task.id}_subtask_2",
"type": "analysis",
"description": f"Analyze data for: {task.description}"
},
{
"id": f"{task.id}_subtask_3",
"type": "synthesis",
"description": f"Synthesize findings for: {task.description}"
}
]
span.set_attributes({
"decomposition.subtasks_created": len(subtasks),
"decomposition.coordinator": coordinator.name,
})
return subtasks
return []
def _assign_subtask_to_swarm(self, subtask: Dict[str, Any]) -> str:
"""Assign a subtask to the most appropriate worker swarm"""
subtask_type = subtask.get("type", "general")
# Simple assignment based on swarm names containing task type
for swarm_name in self.worker_swarms:
if subtask_type.lower() in swarm_name.lower():
return swarm_name
# Fallback to first available swarm
return list(self.worker_swarms.keys())[0] if self.worker_swarms else None
def _synthesize_results(self, swarm_results: Dict[str, Any]) -> Dict[str, Any]:
"""Synthesize results from all worker swarms"""
with tracer.start_as_current_span(
"beeai.hierarchical.synthesize_results",
attributes={
"synthesis.input_count": len(swarm_results),
"synthesis.swarms_involved": list(set(r["swarm"] for r in swarm_results.values())),
}
) as span:
synthesis = {
"task_completed": True,
"subtask_results": swarm_results,
"summary": f"Successfully completed {len(swarm_results)} subtasks",
"swarms_utilized": list(set(r["swarm"] for r in swarm_results.values())),
"synthesis_timestamp": time.time()
}
span.set_attributes({
"synthesis.success": True,
"synthesis.subtasks_processed": len(swarm_results),
"synthesis.swarms_count": len(synthesis["swarms_utilized"]),
})
return synthesis
# Example: Hierarchical Research Organization
def hierarchical_swarm_example():
# Create coordinator
coordinator_agent = TracedBeeAgent(
name="project_coordinator",
role="project_manager",
model="gpt-4"
)
# Create specialized worker swarms
# Research swarm
research_swarm = TracedSwarm("research_specialists", "collaborative")
research_swarm.add_agent(TracedBeeAgent("senior_researcher", "research_lead", "gpt-4"))
research_swarm.add_agent(TracedBeeAgent("junior_researcher", "research_assistant", "gpt-3.5"))
# Analysis swarm
analysis_swarm = TracedSwarm("data_analysts", "parallel")
analysis_swarm.add_agent(TracedBeeAgent("statistical_analyst", "statistics_expert", "gpt-4"))
analysis_swarm.add_agent(TracedBeeAgent("ml_analyst", "machine_learning_expert", "gpt-4"))
# Synthesis swarm
synthesis_swarm = TracedSwarm("content_creators", "collaborative")
synthesis_swarm.add_agent(TracedBeeAgent("technical_writer", "documentation_specialist", "gpt-4"))
synthesis_swarm.add_agent(TracedBeeAgent("editor", "content_editor", "gpt-4"))
# Create hierarchical organization
hierarchy = TracedHierarchicalSwarm("research_organization")
hierarchy.add_coordinator(coordinator_agent)
hierarchy.add_worker_swarm("research_specialists", research_swarm)
hierarchy.add_worker_swarm("data_analysts", analysis_swarm)
hierarchy.add_worker_swarm("content_creators", synthesis_swarm)
# Execute complex task
complex_task = Task(
id="complex_research_001",
type="complex_research",
description="Comprehensive analysis of AI impact on healthcare industry",
priority="critical"
)
print("Executing complex task with hierarchical coordination...")
result = hierarchy.execute_complex_task(complex_task)
print(f"Hierarchical execution completed!")
print(f"Subtasks completed: {result['subtask_results']}")
print(f"Swarms utilized: {result['swarms_utilized']}")
return result
# Run hierarchical example
hierarchical_swarm_example()
Swarm Intelligence and Emergent Behavior
from beeai import Agent, Swarm, Task
from tracing import tracer
from opentelemetry import trace
import time
import random
from typing import List, Dict, Any
class IntelligentSwarm:
def __init__(self, name: str, intelligence_mode: str = "collective"):
self.name = name
self.intelligence_mode = intelligence_mode
self.agents: List[TracedBeeAgent] = []
self.shared_knowledge: Dict[str, Any] = {}
self.swarm_memory: List[Dict[str, Any]] = []
def add_intelligent_agent(self, agent: TracedBeeAgent, learning_rate: float = 0.1):
"""Add an agent with learning capabilities to the swarm"""
with tracer.start_as_current_span(
"beeai.intelligent_swarm.add_agent",
attributes={
"swarm.name": self.name,
"agent.name": agent.name,
"agent.learning_rate": learning_rate,
"swarm.intelligence_mode": self.intelligence_mode,
}
):
agent.learning_rate = learning_rate
agent.experience = []
self.agents.append(agent)
return self
def emergent_problem_solving(self, problem: Task, iterations: int = 3):
"""Solve problems through emergent swarm intelligence"""
with tracer.start_as_current_span(
"beeai.intelligent_swarm.emergent_solving",
attributes={
"swarm.name": self.name,
"problem.id": getattr(problem, 'id', 'unknown'),
"swarm.agents.count": len(self.agents),
"solving.iterations": iterations,
"solving.mode": self.intelligence_mode,
}
) as span:
solutions = []
collective_intelligence = 0.0
for iteration in range(iterations):
with tracer.start_as_current_span(
f"beeai.swarm.iteration.{iteration + 1}",
attributes={
"iteration.number": iteration + 1,
"iteration.total": iterations,
"swarm.collective_intelligence": collective_intelligence,
}
) as iteration_span:
iteration_solutions = []
# Each agent attempts to solve the problem
for agent in self.agents:
with tracer.start_as_current_span(
f"beeai.agent.problem_solving.{agent.name}",
attributes={
"agent.name": agent.name,
"agent.experience.count": len(getattr(agent, 'experience', [])),
}
) as agent_span:
# Agent uses shared knowledge and personal experience
solution = self._agent_solve_with_intelligence(
agent, problem, collective_intelligence
)
iteration_solutions.append({
"agent": agent.name,
"solution": solution,
"confidence": self._calculate_confidence(agent, solution)
})
agent_span.set_attributes({
"agent.solution.confidence": iteration_solutions[-1]["confidence"],
"agent.solution.type": type(solution).__name__,
})
# Update shared knowledge through swarm communication
self._update_shared_knowledge(iteration_solutions)
# Calculate emergent collective intelligence
collective_intelligence = self._calculate_collective_intelligence(
iteration_solutions
)
# Learn from iteration
self._swarm_learning(iteration_solutions, collective_intelligence)
solutions.extend(iteration_solutions)
iteration_span.set_attributes({
"iteration.solutions_generated": len(iteration_solutions),
"iteration.collective_intelligence": collective_intelligence,
"iteration.shared_knowledge_size": len(self.shared_knowledge),
})
# Synthesize final solution through collective intelligence
final_solution = self._synthesize_collective_solution(solutions)
span.set_attributes({
"swarm.final_collective_intelligence": collective_intelligence,
"swarm.total_solutions_generated": len(solutions),
"swarm.final_solution.confidence": final_solution.get("confidence", 0.0),
"swarm.learning.memory_size": len(self.swarm_memory),
})
return final_solution
def _agent_solve_with_intelligence(self, agent: TracedBeeAgent, problem: Task, collective_intelligence: float):
"""Agent solves problem using individual and collective intelligence"""
# Simulate intelligent problem solving
base_solution = f"Solution by {agent.name}: {problem.description}"
# Apply collective intelligence boost
intelligence_boost = 1.0 + (collective_intelligence * 0.5)
# Add learning from experience
if hasattr(agent, 'experience') and agent.experience:
experience_boost = len(agent.experience) * 0.1
intelligence_boost += experience_boost
enhanced_solution = {
"base_solution": base_solution,
"intelligence_boost": intelligence_boost,
"collective_influence": collective_intelligence,
"agent_experience": len(getattr(agent, 'experience', [])),
}
return enhanced_solution
def _calculate_confidence(self, agent: TracedBeeAgent, solution: Dict[str, Any]) -> float:
"""Calculate agent's confidence in the solution"""
base_confidence = 0.5
# Boost confidence based on experience
experience_boost = len(getattr(agent, 'experience', [])) * 0.05
# Boost confidence based on intelligence boost
intelligence_boost = solution.get("intelligence_boost", 1.0) * 0.1
confidence = min(1.0, base_confidence + experience_boost + intelligence_boost)
return confidence
def _update_shared_knowledge(self, iteration_solutions: List[Dict[str, Any]]):
"""Update swarm's shared knowledge base"""
with tracer.start_as_current_span(
"beeai.swarm.update_shared_knowledge",
attributes={
"knowledge.solutions_to_process": len(iteration_solutions),
"knowledge.current_size": len(self.shared_knowledge),
}
) as span:
for solution_data in iteration_solutions:
agent_name = solution_data["agent"]
solution = solution_data["solution"]
confidence = solution_data["confidence"]
# Add high-confidence solutions to shared knowledge
if confidence > 0.7:
key = f"high_confidence_from_{agent_name}"
self.shared_knowledge[key] = {
"solution": solution,
"confidence": confidence,
"timestamp": time.time()
}
span.set_attributes({
"knowledge.updated_size": len(self.shared_knowledge),
"knowledge.high_confidence_additions": len([s for s in iteration_solutions if s["confidence"] > 0.7]),
})
def _calculate_collective_intelligence(self, iteration_solutions: List[Dict[str, Any]]) -> float:
"""Calculate emergent collective intelligence of the swarm"""
if not iteration_solutions:
return 0.0
# Average confidence across all agents
avg_confidence = sum(s["confidence"] for s in iteration_solutions) / len(iteration_solutions)
# Diversity bonus (different solution approaches)
solution_types = set(type(s["solution"]).__name__ for s in iteration_solutions)
diversity_bonus = len(solution_types) * 0.1
# Shared knowledge bonus
knowledge_bonus = len(self.shared_knowledge) * 0.05
collective_intelligence = min(1.0, avg_confidence + diversity_bonus + knowledge_bonus)
return collective_intelligence
def _swarm_learning(self, iteration_solutions: List[Dict[str, Any]], collective_intelligence: float):
"""Enable swarm-wide learning from the iteration"""
with tracer.start_as_current_span(
"beeai.swarm.learning",
attributes={
"learning.collective_intelligence": collective_intelligence,
"learning.solutions_count": len(iteration_solutions),
}
) as span:
# Update each agent's experience
for agent in self.agents:
if hasattr(agent, 'experience'):
agent.experience.append({
"iteration_intelligence": collective_intelligence,
"solutions_observed": len(iteration_solutions),
"timestamp": time.time()
})
# Limit experience memory
if len(agent.experience) > 10:
agent.experience = agent.experience[-10:]
# Update swarm memory
self.swarm_memory.append({
"collective_intelligence": collective_intelligence,
"solutions_generated": len(iteration_solutions),
"timestamp": time.time()
})
# Limit swarm memory
if len(self.swarm_memory) > 20:
self.swarm_memory = self.swarm_memory[-20:]
span.set_attributes({
"learning.agents_updated": len(self.agents),
"learning.swarm_memory_size": len(self.swarm_memory),
})
def _synthesize_collective_solution(self, all_solutions: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Synthesize the best collective solution from all iterations"""
with tracer.start_as_current_span(
"beeai.swarm.synthesize_collective_solution",
attributes={
"synthesis.total_solutions": len(all_solutions),
}
) as span:
if not all_solutions:
return {"error": "No solutions generated"}
# Find highest confidence solutions
high_confidence_solutions = [s for s in all_solutions if s["confidence"] > 0.8]
if not high_confidence_solutions:
high_confidence_solutions = sorted(all_solutions, key=lambda x: x["confidence"], reverse=True)[:3]
# Create collective solution
collective_solution = {
"collective_solution": "Synthesized from swarm intelligence",
"contributing_agents": list(set(s["agent"] for s in high_confidence_solutions)),
"confidence": sum(s["confidence"] for s in high_confidence_solutions) / len(high_confidence_solutions),
"solutions_considered": len(all_solutions),
"high_confidence_count": len(high_confidence_solutions),
"swarm_intelligence_level": self._calculate_collective_intelligence(all_solutions),
"synthesis_timestamp": time.time()
}
span.set_attributes({
"synthesis.high_confidence_solutions": len(high_confidence_solutions),
"synthesis.contributing_agents": len(collective_solution["contributing_agents"]),
"synthesis.final_confidence": collective_solution["confidence"],
"synthesis.intelligence_level": collective_solution["swarm_intelligence_level"],
})
return collective_solution
# Example: Intelligent Problem-Solving Swarm
def intelligent_swarm_example():
# Create intelligent swarm
swarm = IntelligentSwarm("problem_solving_collective", "collective")
# Add diverse intelligent agents
swarm.add_intelligent_agent(
TracedBeeAgent("analytical_thinker", "analyst", "gpt-4"),
learning_rate=0.15
)
swarm.add_intelligent_agent(
TracedBeeAgent("creative_solver", "creative", "gpt-4"),
learning_rate=0.12
)
swarm.add_intelligent_agent(
TracedBeeAgent("practical_implementer", "implementer", "gpt-4"),
learning_rate=0.10
)
swarm.add_intelligent_agent(
TracedBeeAgent("systems_thinker", "systems_analyst", "gpt-4"),
learning_rate=0.13
)
# Complex problem requiring collective intelligence
complex_problem = Task(
id="complex_001",
type="optimization",
description="Optimize urban traffic flow while reducing emissions and improving safety",
priority="critical"
)
print("Solving complex problem through swarm intelligence...")
solution = swarm.emergent_problem_solving(complex_problem, iterations=4)
print(f"\nCollective Solution Generated!")
print(f"Contributing agents: {solution['contributing_agents']}")
print(f"Final confidence: {solution['confidence']:.2f}")
print(f"Swarm intelligence level: {solution['swarm_intelligence_level']:.2f}")
print(f"Solutions considered: {solution['solutions_considered']}")
return solution
# Run intelligent swarm example
intelligent_swarm_example()
Next Steps
✅ Verify traces: Check your Orq.ai dashboard to see incoming traces
✅ Add custom attributes: Enhance traces with swarm-specific metadata
✅ Set up alerts: Configure monitoring for swarm coordination failures
✅ Explore metrics: Use trace data for swarm optimization
✅ Monitor collective intelligence: Track emergent behavior patterns
Related Documentation
Support
Updated about 24 hours ago