Haystack
Integrate Orq.ai with Deepset Haystack for RAG pipeline observability using OpenTelemetry
Getting Started
Deepset Haystack is a powerful framework for building RAG (Retrieval-Augmented Generation) pipelines and document search systems. Tracing Haystack with Orq.ai provides comprehensive insights into pipeline performance, component interactions, retrieval effectiveness, and answer generation quality to optimize your search and QA applications.
Prerequisites
Before you begin, ensure you have:
- An Orq.ai account and API key
- Haystack 2.x installed in your project
- Python 3.8+
- API keys for your chosen LLM and embedding providers
Install Dependencies
# Core Haystack and OpenTelemetry packages
pip install haystack-ai opentelemetry-sdk opentelemetry-exporter-otlp
# Additional instrumentation packages
pip install openlit openllmetry mlflow
# LLM and embedding providers
pip install openai anthropic cohere
# Optional: Document processing and vector stores
pip install haystack-chroma haystack-pinecone haystack-weaviate
pip install pypdf python-docx
Configure Orq.ai
Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:
Unix/Linux/macOS:
export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=haystack-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
export COHERE_API_KEY="<YOUR_COHERE_API_KEY>"
Windows (PowerShell):
$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=haystack-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"
$env:COHERE_API_KEY = "<YOUR_COHERE_API_KEY>"
Using .env file:
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=haystack-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>
COHERE_API_KEY=<YOUR_COHERE_API_KEY>
Integrations
Choose your preferred OpenTelemetry framework for collecting traces:
OpenLit
Auto-instrumentation with minimal setup:
import openlit
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Initialize OpenLit
openlit.init(
otlp_endpoint="https://api.orq.ai/v2/otel",
otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)
# Your Haystack pipelines are automatically traced
document_store = InMemoryDocumentStore()
retriever = InMemoryBM25Retriever(document_store)
generator = OpenAIGenerator(model="gpt-4")
# Create RAG pipeline
pipeline = Pipeline()
pipeline.add_component("retriever", retriever)
pipeline.add_component("generator", generator)
pipeline.connect("retriever", "generator")
# Run pipeline - automatically traced
result = pipeline.run({
"retriever": {"query": "What are the benefits of renewable energy?"},
"generator": {"prompt": "Answer based on context: {{context}}\n\nQuestion: {{query}}"}
})
OpenLLMetry
Non-intrusive tracing with decorators:
from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.embedders import OpenAITextEmbedder
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack import Document
Traceloop.init()
@workflow(name="haystack-rag-pipeline")
def create_rag_system():
# Initialize document store and components
document_store = InMemoryDocumentStore()
embedder = OpenAITextEmbedder()
retriever = InMemoryEmbeddingRetriever(document_store)
generator = OpenAIGenerator(model="gpt-4")
writer = DocumentWriter(document_store)
# Sample documents
documents = [
Document(content="Renewable energy sources include solar, wind, hydroelectric, and geothermal power. These sources are sustainable and have minimal environmental impact."),
Document(content="Solar panels convert sunlight into electricity using photovoltaic cells. They are becoming increasingly cost-effective and efficient."),
Document(content="Wind turbines generate electricity by converting wind energy into mechanical energy. Wind farms are common in areas with consistent wind patterns.")
]
# Index documents
embeddings = embedder.run(texts=[doc.content for doc in documents])
for doc, embedding in zip(documents, embeddings["embeddings"]):
doc.embedding = embedding
writer.run(documents=documents)
# Create query pipeline
query_pipeline = Pipeline()
query_pipeline.add_component("text_embedder", embedder)
query_pipeline.add_component("retriever", retriever)
query_pipeline.add_component("generator", generator)
query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
query_pipeline.connect("retriever", "generator")
return query_pipeline
@task(name="execute-query")
def execute_query(pipeline: Pipeline, question: str):
result = pipeline.run({
"text_embedder": {"text": question},
"generator": {
"prompt": "Based on the following context, answer the question.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
}
})
return result
# Create and use RAG system
rag_pipeline = create_rag_system()
answer = execute_query(rag_pipeline, "What are the advantages of solar energy?")
MLFlow
MLOps-focused tracing with pipeline metrics:
import mlflow
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document
# Enable MLflow tracing
mlflow.autolog()
@mlflow.trace
def build_and_run_qa_pipeline(query: str, top_k: int = 3):
document_store = InMemoryDocumentStore()
# Sample knowledge base
docs = [
Document(content="Machine learning is a subset of artificial intelligence that focuses on algorithms that can learn from data."),
Document(content="Deep learning uses neural networks with multiple layers to model and understand complex patterns in data."),
Document(content="Natural language processing (NLP) enables computers to understand, interpret, and generate human language."),
Document(content="Computer vision allows machines to identify and process visual information from the world.")
]
with mlflow.start_run():
mlflow.log_param("query", query)
mlflow.log_param("top_k", top_k)
mlflow.log_param("document_count", len(docs))
# Embed and store documents
embedder = OpenAITextEmbedder()
embeddings_result = embedder.run(texts=[doc.content for doc in docs])
for doc, embedding in zip(docs, embeddings_result["embeddings"]):
doc.embedding = embedding
document_store.write_documents(docs)
# Build pipeline
pipeline = Pipeline()
pipeline.add_component("text_embedder", embedder)
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=top_k))
pipeline.add_component("generator", OpenAIGenerator(model="gpt-3.5-turbo"))
# Connect components
pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever", "generator")
# Execute query
result = pipeline.run({
"text_embedder": {"text": query},
"generator": {
"prompt": "Answer the question based on the given context.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
}
})
# Log results
retrieved_docs = result.get("retriever", {}).get("documents", [])
mlflow.log_metric("retrieved_documents", len(retrieved_docs))
if retrieved_docs:
avg_score = sum(doc.score for doc in retrieved_docs if doc.score) / len(retrieved_docs)
mlflow.log_metric("avg_retrieval_score", avg_score)
answer = result.get("generator", {}).get("replies", ["No answer generated"])[0]
mlflow.log_param("generated_answer", answer[:200]) # First 200 chars
return result
result = build_and_run_qa_pipeline("What is machine learning?")
OpenInference
Specialized tracing for retrieval and LLM operations:
from openinference.instrumentation.haystack import HaystackInstrumentor
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Initialize OpenTelemetry
resource = Resource.create({"service.name": "haystack-app"})
trace.set_tracer_provider(TracerProvider(resource=resource))
otlp_exporter = OTLPSpanExporter(
endpoint="https://api.orq.ai/v2/otel/v1/traces",
headers={"Authorization": "Bearer <ORQ_API_KEY>"}
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
# Instrument Haystack
HaystackInstrumentor().instrument()
def advanced_rag_pipeline():
# Setup document store with rich content
document_store = InMemoryDocumentStore()
documents = [
Document(
content="Python is a high-level programming language known for its simplicity and readability. It's widely used in web development, data science, and AI.",
meta={"category": "programming", "difficulty": "beginner"}
),
Document(
content="JavaScript is the programming language of the web. It enables interactive web pages and is used for both frontend and backend development.",
meta={"category": "programming", "difficulty": "beginner"}
),
Document(
content="Machine learning algorithms can be supervised, unsupervised, or reinforcement learning. Each type solves different kinds of problems.",
meta={"category": "ai", "difficulty": "intermediate"}
),
Document(
content="Neural networks are inspired by biological neurons and consist of interconnected nodes that process information in layers.",
meta={"category": "ai", "difficulty": "advanced"}
)
]
# Embed documents
embedder = OpenAITextEmbedder()
embeddings = embedder.run(texts=[doc.content for doc in documents])
for doc, embedding in zip(documents, embeddings["embeddings"]):
doc.embedding = embedding
document_store.write_documents(documents)
# Create comprehensive pipeline
pipeline = Pipeline()
pipeline.add_component("embedder", embedder)
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=2))
pipeline.add_component("generator", OpenAIGenerator(
model="gpt-4",
generation_kwargs={
"max_tokens": 200,
"temperature": 0.3
}
))
# Connect components
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever", "generator")
# Execute queries with detailed tracking
queries = [
"What is Python programming language?",
"Explain machine learning algorithms",
"How do neural networks work?"
]
results = []
for query in queries:
result = pipeline.run({
"embedder": {"text": query},
"generator": {
"prompt": """Based on the retrieved documents, provide a comprehensive answer to the question.
Context Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}
Question: {{ query }}
Please provide a detailed answer based on the context:"""
}
})
results.append(result)
return results
pipeline_results = advanced_rag_pipeline()
Manual OpenTelemetry
Fine-grained control over pipeline tracing:
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Initialize OpenTelemetry
resource = Resource.create({"service.name": "haystack-app"})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer("haystack")
# Configure OTLP exporter
otlp_exporter = OTLPSpanExporter(
endpoint="https://api.orq.ai/v2/otel/v1/traces",
headers={"Authorization": "Bearer <ORQ_API_KEY>"}
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
def traced_document_qa_system():
with tracer.start_as_current_span("haystack-qa-system") as main_span:
main_span.set_attribute("system.type", "document_qa")
# Document preparation
with tracer.start_as_current_span("document-preparation") as prep_span:
document_store = InMemoryDocumentStore()
documents = [
Document(content="Climate change refers to long-term shifts in global temperatures and weather patterns. While climate change is natural, human activities have been the main driver since the 1800s."),
Document(content="Renewable energy comes from sources that are naturally replenished, such as sunlight, wind, rain, tides, waves, and geothermal heat."),
Document(content="Carbon footprint is the total amount of greenhouse gases produced directly and indirectly by human activities, measured in carbon dioxide equivalent."),
Document(content="Sustainable development meets present needs without compromising the ability of future generations to meet their own needs.")
]
document_store.write_documents(documents)
prep_span.set_attribute("documents.count", len(documents))
prep_span.set_attribute("document_store.type", "in_memory")
# Pipeline construction
with tracer.start_as_current_span("pipeline-construction") as build_span:
pipeline = Pipeline()
# Add components
retriever = InMemoryBM25Retriever(document_store, top_k=2)
generator = OpenAIGenerator(
model="gpt-4",
generation_kwargs={"temperature": 0.2, "max_tokens": 150}
)
pipeline.add_component("retriever", retriever)
pipeline.add_component("generator", generator)
pipeline.connect("retriever", "generator")
build_span.set_attribute("pipeline.components", 2)
build_span.set_attribute("retriever.top_k", 2)
build_span.set_attribute("generator.model", "gpt-4")
# Execute multiple queries
queries = [
"What is climate change?",
"How does renewable energy help the environment?",
"What is a carbon footprint?"
]
results = []
for i, query in enumerate(queries):
with tracer.start_as_current_span(f"query-execution-{i}") as query_span:
query_span.set_attribute("query.text", query)
query_span.set_attribute("query.index", i)
try:
with tracer.start_as_current_span("pipeline-run") as run_span:
result = pipeline.run({
"retriever": {"query": query},
"generator": {
"prompt": "Answer the question based on the provided context.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
}
})
# Extract and log results
retrieved_docs = result.get("retriever", {}).get("documents", [])
generated_reply = result.get("generator", {}).get("replies", [""])[0]
run_span.set_attribute("retrieved_documents.count", len(retrieved_docs))
run_span.set_attribute("generated_reply.length", len(generated_reply))
if retrieved_docs:
avg_score = sum(doc.score for doc in retrieved_docs if hasattr(doc, 'score') and doc.score) / len(retrieved_docs)
run_span.set_attribute("retrieval.avg_score", avg_score)
query_span.set_attribute("query.success", True)
query_span.set_attribute("answer.preview", generated_reply[:100])
results.append(result)
except Exception as e:
query_span.record_exception(e)
query_span.set_attribute("query.failed", True)
raise
main_span.set_attribute("total_queries", len(queries))
main_span.set_attribute("successful_queries", len(results))
main_span.set_attribute("system.completed", True)
return results
qa_results = traced_document_qa_system()
Examples
Basic RAG Pipeline
import openlit
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Initialize tracing
openlit.init(
otlp_endpoint="https://api.orq.ai/v2/otel",
otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)
def basic_rag_example():
# Create document store and add sample documents
document_store = InMemoryDocumentStore()
docs = [
Document(content="The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France."),
Document(content="The Great Wall of China is a series of fortifications built across northern China."),
Document(content="Machu Picchu is a 15th-century Inca citadel in Peru, situated on a mountain ridge."),
Document(content="The Colosseum is an oval amphitheatre in the centre of Rome, Italy.")
]
# Embed documents
embedder = OpenAITextEmbedder()
embeddings = embedder.run(texts=[doc.content for doc in docs])
for doc, embedding in zip(docs, embeddings["embeddings"]):
doc.embedding = embedding
document_store.write_documents(docs)
# Create RAG pipeline
pipeline = Pipeline()
pipeline.add_component("text_embedder", embedder)
pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=2))
pipeline.add_component("generator", OpenAIGenerator(model="gpt-3.5-turbo"))
# Connect components
pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever", "generator")
# Ask a question
question = "Tell me about historical monuments in France"
result = pipeline.run({
"text_embedder": {"text": question},
"generator": {
"prompt": """Answer the question based on the context provided.
Context:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query }}
Answer:"""
}
})
return result["generator"]["replies"][0]
answer = basic_rag_example()
print(f"Answer: {answer}")
Multi-Modal Document Processing
import openlit
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
openlit.init(
otlp_endpoint="https://api.orq.ai/v2/otel",
otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)
def document_processing_pipeline():
# Sample long-form documents
documents = [
Document(content="""
Artificial Intelligence (AI) has revolutionized numerous industries over the past decade.
From healthcare to finance, AI applications are transforming how we work and live.
Machine learning, a subset of AI, enables computers to learn patterns from data without
explicit programming. Deep learning, using neural networks, has achieved breakthrough
results in image recognition, natural language processing, and speech recognition.
The future of AI holds immense potential. Generative AI models can create human-like
text, images, and even code. However, ethical considerations around bias, privacy,
and job displacement remain important challenges to address.
""", meta={"source": "AI_Overview.txt", "category": "technology"}),
Document(content="""
Climate change represents one of the most pressing challenges of our time. Global
temperatures have risen significantly due to increased greenhouse gas emissions from
human activities. The effects include rising sea levels, extreme weather events,
and ecosystem disruptions.
Renewable energy sources like solar, wind, and hydroelectric power offer sustainable
alternatives to fossil fuels. Energy efficiency improvements and carbon capture
technologies also play crucial roles in mitigation efforts. International cooperation
through agreements like the Paris Climate Accord is essential for global action.
""", meta={"source": "Climate_Report.pdf", "category": "environment"})
]
# Create processing pipeline
document_store = InMemoryDocumentStore()
# Build indexing pipeline
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=3))
indexing_pipeline.add_component("writer", document_store.get_writer())
indexing_pipeline.connect("splitter", "writer")
# Process documents
indexing_pipeline.run({"splitter": {"documents": documents}})
# Build query pipeline
query_pipeline = Pipeline()
query_pipeline.add_component("retriever", InMemoryBM25Retriever(document_store, top_k=3))
query_pipeline.add_component("generator", OpenAIGenerator(
model="gpt-4",
generation_kwargs={"temperature": 0.1, "max_tokens": 200}
))
query_pipeline.connect("retriever", "generator")
# Execute queries
questions = [
"What are the main applications of artificial intelligence?",
"How does climate change affect the environment?",
"What are renewable energy sources?"
]
answers = []
for question in questions:
result = query_pipeline.run({
"retriever": {"query": question},
"generator": {
"prompt": """Use the following context to answer the question comprehensively.
Context:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Question: {{ query }}
Provide a detailed answer based on the context:"""
}
})
answers.append({
"question": question,
"answer": result["generator"]["replies"][0],
"sources": [doc.meta.get("source", "Unknown") for doc in result["retriever"]["documents"]]
})
return answers
processed_results = document_processing_pipeline()
for result in processed_results:
print(f"Q: {result['question']}")
print(f"A: {result['answer'][:100]}...")
print(f"Sources: {result['sources']}\n")
Advanced Pipeline with Custom Components
import openlit
from haystack import Pipeline, Document, component, default_from_dict, default_to_dict
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from typing import List, Dict, Any
import re
openlit.init(
otlp_endpoint="https://api.orq.ai/v2/otel",
otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)
@component
class QueryAnalyzer:
"""Custom component to analyze and enhance queries"""
@component.output_types(enhanced_query=str, query_type=str, entities=List[str])
def run(self, query: str) -> Dict[str, Any]:
# Simple query analysis
query_lower = query.lower()
# Determine query type
if any(word in query_lower for word in ["what", "define", "explain"]):
query_type = "definition"
elif any(word in query_lower for word in ["how", "process", "steps"]):
query_type = "process"
elif any(word in query_lower for word in ["compare", "difference", "versus"]):
query_type = "comparison"
else:
query_type = "general"
# Extract potential entities (simple approach)
entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', query)
# Enhance query based on type
if query_type == "definition":
enhanced_query = f"Provide a comprehensive definition and explanation of: {query}"
elif query_type == "process":
enhanced_query = f"Describe the process and steps involved in: {query}"
elif query_type == "comparison":
enhanced_query = f"Compare and contrast the following: {query}"
else:
enhanced_query = query
return {
"enhanced_query": enhanced_query,
"query_type": query_type,
"entities": entities
}
@component
class AnswerPostProcessor:
"""Custom component to post-process generated answers"""
@component.output_types(processed_answer=str, confidence_score=float)
def run(self, answer: str, query_type: str) -> Dict[str, Any]:
# Simple post-processing based on query type
processed_answer = answer.strip()
# Add structure based on query type
if query_type == "definition" and not processed_answer.startswith("Definition:"):
processed_answer = f"Definition: {processed_answer}"
elif query_type == "process" and "steps" not in processed_answer.lower():
processed_answer = f"Process Overview: {processed_answer}"
# Simple confidence scoring (in real implementation, this would be more sophisticated)
confidence_score = 0.8 if len(processed_answer) > 100 else 0.6
return {
"processed_answer": processed_answer,
"confidence_score": confidence_score
}
def advanced_custom_pipeline():
# Setup document store with diverse content
document_store = InMemoryDocumentStore()
documents = [
Document(content="Photosynthesis is the process by which plants use sunlight, water, and carbon dioxide to produce glucose and oxygen. This process occurs in chloroplasts and involves light-dependent and light-independent reactions."),
Document(content="Machine learning is a method of data analysis that automates analytical model building. It uses algorithms that iteratively learn from data, allowing computers to find insights without being explicitly programmed."),
Document(content="The water cycle describes how water moves through Earth's atmosphere, land, and oceans. It includes processes like evaporation, condensation, precipitation, and collection."),
Document(content="Renewable energy sources include solar power, wind power, hydroelectric power, and geothermal energy. These sources are sustainable and have lower environmental impact compared to fossil fuels.")
]
document_store.write_documents(documents)
# Build advanced pipeline with custom components
pipeline = Pipeline()
# Add components
pipeline.add_component("query_analyzer", QueryAnalyzer())
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store, top_k=2))
pipeline.add_component("generator", OpenAIGenerator(
model="gpt-4",
generation_kwargs={"temperature": 0.2, "max_tokens": 250}
))
pipeline.add_component("post_processor", AnswerPostProcessor())
# Connect components
pipeline.connect("query_analyzer.enhanced_query", "retriever.query")
pipeline.connect("retriever", "generator")
pipeline.connect("generator.replies", "post_processor.answer")
pipeline.connect("query_analyzer.query_type", "post_processor.query_type")
# Test with various query types
test_queries = [
"What is photosynthesis?",
"How does the water cycle work?",
"Compare renewable energy and fossil fuels",
"Explain machine learning algorithms"
]
results = []
for query in test_queries:
result = pipeline.run({
"query_analyzer": {"query": query},
"generator": {
"prompt": """Based on the provided context, answer the question accurately and comprehensively.
Context Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}
Enhanced Query: {{ enhanced_query }}
Please provide a detailed response:"""
}
})
results.append({
"original_query": query,
"query_type": result["query_analyzer"]["query_type"],
"entities": result["query_analyzer"]["entities"],
"final_answer": result["post_processor"]["processed_answer"],
"confidence": result["post_processor"]["confidence_score"]
})
return results
custom_results = advanced_custom_pipeline()
for result in custom_results:
print(f"Query: {result['original_query']}")
print(f"Type: {result['query_type']}")
print(f"Entities: {result['entities']}")
print(f"Answer: {result['final_answer'][:150]}...")
print(f"Confidence: {result['confidence']:.2f}\n")
Next Steps
✅ Verify traces: Check your Orq.ai dashboard to see incoming Haystack traces ✅ Monitor pipeline performance: Track component latencies and retrieval effectiveness ✅ Add custom spans: Enhance traces with domain-specific metadata ✅ Set up alerts: Configure monitoring for pipeline failures or performance degradation ✅ Analyze retrieval quality: Use trace data to optimize document chunking and retrieval parameters
Related Documentation
- Orq.ai Dashboard Guide
- OpenTelemetry Best Practices
- Haystack Documentation
- RAG Pipeline Optimization Guide
Support
Updated about 24 hours ago