Haystack
Integrate Orq.ai with Deepset Haystack for RAG pipeline observability using OpenTelemetry
Getting Started
Deepset Haystack is a powerful framework for building RAG (Retrieval-Augmented Generation) pipelines and document search systems. Tracing Haystack with Orq.ai provides comprehensive insights into pipeline performance, component interactions, retrieval effectiveness, and answer generation quality to optimize your search and QA applications.
Prerequisites
Before you begin, ensure you have:
- An Orq.ai account and API Key
- Haystack 2.x installed in your project
- Python 3.8+
- API keys for your chosen LLM and embedding providers
Install Dependencies
# Core Haystack and OpenTelemetry packages
pip install haystack-ai opentelemetry-sdk opentelemetry-exporter-otlp
# Additional instrumentation packages
pip install openllmetry
# LLM and embedding providers
pip install openai anthropic cohere
# Optional: Document processing and vector stores
pip install haystack-chroma haystack-pinecone haystack-weaviate
pip install pypdf python-docx
Configure Orq.ai
Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:
Unix/Linux/macOS:
export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=haystack-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
export COHERE_API_KEY="<YOUR_COHERE_API_KEY>"
Windows (PowerShell):
$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=haystack-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"
$env:COHERE_API_KEY = "<YOUR_COHERE_API_KEY>"
Using .env file:
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=haystack-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>
COHERE_API_KEY=<YOUR_COHERE_API_KEY>
Integrations
Choose your preferred OpenTelemetry framework for collecting traces, we'll be using OpenLLMetry.
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.embedders import OpenAITextEmbedder
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack import Document
Traceloop.init(
api_endpoint="https://api.orq.ai/v2/otel",
headers={"Authorization": "Bearer $ORQ_API_KEY"}
)
@workflow(name="haystack-rag-pipeline")
def create_rag_system():
# Initialize document store and components
document_store = InMemoryDocumentStore()
embedder = OpenAITextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store)
generator = OpenAIGenerator(model="gpt-4")
writer = DocumentWriter(document_store)
# Sample documents
documents = [
Document(content="Renewable energy sources include solar, wind, hydroelectric, and geothermal power. These sources are sustainable and have minimal environmental impact."),
Document(content="Solar panels convert sunlight into electricity using photovoltaic cells. They are becoming increasingly cost-effective and efficient."),
Document(content="Wind turbines generate electricity by converting wind energy into mechanical energy. Wind farms are common in areas with consistent wind patterns.")
]
# Index documents
embeddings = embedder.run("".join([doc.content for doc in documents]))
for doc, embedding in zip(documents, embeddings["embeddings"]):
doc.embedding = embedding
writer.run(documents=documents)
# Create query pipeline
query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", embedder)
query_pipeline.add_component("retriever", retriever)
query_pipeline.add_component("generator", generator)
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever", "generator")
return query_pipeline
@task(name="execute-query")
def execute_query(pipeline: Pipeline, question: str):
result = pipeline.run({
"text_embedder": {"text": question},
"generator": {
"prompt": "Based on the following context, answer the question.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
}
})
return result
# Create and use RAG system
rag_pipeline = create_rag_system()
answer = execute_query(rag_pipeline, "What are the advantages of solar energy?")
Examples
Advanced Pipeline with Custom Components
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from haystack import Pipeline, Document, component, default_from_dict, default_to_dict
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from typing import List, Dict, Any
import re
Traceloop.init(
api_endpoint="https://api.orq.ai/v2/otel",
headers={"Authorization": "Bearer $ORQ_API_KEY"}
)
class QueryAnalyzer:
"""Custom component to analyze and enhance queries"""
def run(self, query: str) -> Dict[str, Any]:
# Simple query analysis
query_lower = query.lower()
# Determine query type
if any(word in query_lower for word in ["what", "define", "explain"]):
query_type = "definition"
elif any(word in query_lower for word in ["how", "process", "steps"]):
query_type = "process"
elif any(word in query_lower for word in ["compare", "difference", "versus"]):
query_type = "comparison"
else:
query_type = "general"
# Extract potential entities (simple approach)
entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', query)
# Enhance query based on type
if query_type == "definition":
enhanced_query = f"Provide a comprehensive definition and explanation of: {query}"
elif query_type == "process":
enhanced_query = f"Describe the process and steps involved in: {query}"
elif query_type == "comparison":
enhanced_query = f"Compare and contrast the following: {query}"
else:
enhanced_query = query
return {
"enhanced_query": enhanced_query,
"query_type": query_type,
"entities": entities
}
class AnswerPostProcessor:
"""Custom component to post-process generated answers"""
@component.output_types(processed_answer=str, confidence_score=float)
def run(self, answer: str, query_type: str) -> Dict[str, Any]:
# Simple post-processing based on query type
processed_answer = answer.strip()
# Add structure based on query type
if query_type == "definition" and not processed_answer.startswith("Definition:"):
processed_answer = f"Definition: {processed_answer}"
elif query_type == "process" and "steps" not in processed_answer.lower():
processed_answer = f"Process Overview: {processed_answer}"
# Simple confidence scoring (in real implementation, this would be more sophisticated)
confidence_score = 0.8 if len(processed_answer) > 100 else 0.6
return {
"processed_answer": processed_answer,
"confidence_score": confidence_score
}
@workflow(name="haystack-rag-pipeline")
def advanced_custom_pipeline():
# Setup document store with diverse content
document_store = InMemoryDocumentStore()
documents = [
Document(content="Photosynthesis is the process by which plants use sunlight, water, and carbon dioxide to produce glucose and oxygen. This process occurs in chloroplasts and involves light-dependent and light-independent reactions."),
Document(content="Machine learning is a method of data analysis that automates analytical model building. It uses algorithms that iteratively learn from data, allowing computers to find insights without being explicitly programmed."),
Document(content="The water cycle describes how water moves through Earth's atmosphere, land, and oceans. It includes processes like evaporation, condensation, precipitation, and collection."),
Document(content="Renewable energy sources include solar power, wind power, hydroelectric power, and geothermal energy. These sources are sustainable and have lower environmental impact compared to fossil fuels.")
]
document_store.write_documents(documents)
# Build advanced pipeline with custom components
pipeline = Pipeline()
# Add components
pipeline.add_component("query_analyzer", QueryAnalyzer())
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store, top_k=2))
pipeline.add_component("generator", OpenAIGenerator(
model="gpt-4",
generation_kwargs={"temperature": 0.2, "max_tokens": 250}
))
pipeline.add_component("post_processor", AnswerPostProcessor())
# Connect components
pipeline.connect("query_analyzer.enhanced_query", "retriever.query")
pipeline.connect("retriever", "generator")
pipeline.connect("generator.replies", "post_processor.answer")
pipeline.connect("query_analyzer.query_type", "post_processor.query_type")
# Test with various query types
test_queries = [
"What is photosynthesis?",
"How does the water cycle work?",
"Compare renewable energy and fossil fuels",
"Explain machine learning algorithms"
]
results = []
for query in test_queries:
result = pipeline.run({
"query_analyzer": {"query": query},
"generator": {
"prompt": """Based on the provided context, answer the question accurately and comprehensively.
Context Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Enhanced Query: {{ enhanced_query }}
Please provide a detailed response:"""
}
})
results.append({
"original_query": query,
"query_type": result["query_analyzer"]["query_type"],
"entities": result["query_analyzer"]["entities"],
"final_answer": result["post_processor"]["processed_answer"],
"confidence": result["post_processor"]["confidence_score"]
})
return results
custom_results = advanced_custom_pipeline()
for result in custom_results:
print(f"Query: {result['original_query']}")
print(f"Type: {result['query_type']}")
print(f"Entities: {result['entities']}")
print(f"Answer: {result['final_answer'][:150]}...")
print(f"Confidence: {result['confidence']:.2f}\n")
View Traces
To view Traces, head to the Traces tab in the Orq.ai studio.

Updated about 2 hours ago