Haystack

Integrate Orq.ai with Deepset Haystack for RAG pipeline observability using OpenTelemetry

Getting Started

Deepset Haystack is a powerful framework for building RAG (Retrieval-Augmented Generation) pipelines and document search systems. Tracing Haystack with Orq.ai provides comprehensive insights into pipeline performance, component interactions, retrieval effectiveness, and answer generation quality to optimize your search and QA applications.

Prerequisites

Before you begin, ensure you have:

  • An Orq.ai account and API key
  • Haystack 2.x installed in your project
  • Python 3.8+
  • API keys for your chosen LLM and embedding providers

Install Dependencies

# Core Haystack and OpenTelemetry packages
pip install haystack-ai opentelemetry-sdk opentelemetry-exporter-otlp

# Additional instrumentation packages
pip install openlit openllmetry mlflow

# LLM and embedding providers
pip install openai anthropic cohere

# Optional: Document processing and vector stores
pip install haystack-chroma haystack-pinecone haystack-weaviate
pip install pypdf python-docx

Configure Orq.ai

Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:

Unix/Linux/macOS:

export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=haystack-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
export COHERE_API_KEY="<YOUR_COHERE_API_KEY>"

Windows (PowerShell):

$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=haystack-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"
$env:COHERE_API_KEY = "<YOUR_COHERE_API_KEY>"

Using .env file:

OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=haystack-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>
COHERE_API_KEY=<YOUR_COHERE_API_KEY>

Integrations

Choose your preferred OpenTelemetry framework for collecting traces:

OpenLit

Auto-instrumentation with minimal setup:

import openlit
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

# Initialize OpenLit
openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

# Your Haystack pipelines are automatically traced
document_store = InMemoryDocumentStore()
retriever = InMemoryBM25Retriever(document_store)
generator = OpenAIGenerator(model="gpt-4")

# Create RAG pipeline
pipeline = Pipeline()
pipeline.add_component("retriever", retriever)
pipeline.add_component("generator", generator)
pipeline.connect("retriever", "generator")

# Run pipeline - automatically traced
result = pipeline.run({
    "retriever": {"query": "What are the benefits of renewable energy?"},
    "generator": {"prompt": "Answer based on context: {{context}}\n\nQuestion: {{query}}"}
})

OpenLLMetry

Non-intrusive tracing with decorators:

from traceloop.sdk import Traceloop
from traceloop.sdk.decorators import workflow, task
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.embedders import OpenAITextEmbedder
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.writers import DocumentWriter
from haystack import Document

Traceloop.init()

@workflow(name="haystack-rag-pipeline")
def create_rag_system():
    # Initialize document store and components
    document_store = InMemoryDocumentStore()
    embedder = OpenAITextEmbedder()
    retriever = InMemoryEmbeddingRetriever(document_store)
    generator = OpenAIGenerator(model="gpt-4")
    writer = DocumentWriter(document_store)

    # Sample documents
    documents = [
        Document(content="Renewable energy sources include solar, wind, hydroelectric, and geothermal power. These sources are sustainable and have minimal environmental impact."),
        Document(content="Solar panels convert sunlight into electricity using photovoltaic cells. They are becoming increasingly cost-effective and efficient."),
        Document(content="Wind turbines generate electricity by converting wind energy into mechanical energy. Wind farms are common in areas with consistent wind patterns.")
    ]

    # Index documents
    embeddings = embedder.run(texts=[doc.content for doc in documents])
    for doc, embedding in zip(documents, embeddings["embeddings"]):
        doc.embedding = embedding

    writer.run(documents=documents)

    # Create query pipeline
    query_pipeline = Pipeline()
    query_pipeline.add_component("text_embedder", embedder)
    query_pipeline.add_component("retriever", retriever)
    query_pipeline.add_component("generator", generator)

    query_pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
    query_pipeline.connect("retriever", "generator")

    return query_pipeline

@task(name="execute-query")
def execute_query(pipeline: Pipeline, question: str):
    result = pipeline.run({
        "text_embedder": {"text": question},
        "generator": {
            "prompt": "Based on the following context, answer the question.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
        }
    })
    return result

# Create and use RAG system
rag_pipeline = create_rag_system()
answer = execute_query(rag_pipeline, "What are the advantages of solar energy?")

MLFlow

MLOps-focused tracing with pipeline metrics:

import mlflow
from haystack import Pipeline
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack import Document

# Enable MLflow tracing
mlflow.autolog()

@mlflow.trace
def build_and_run_qa_pipeline(query: str, top_k: int = 3):
    document_store = InMemoryDocumentStore()

    # Sample knowledge base
    docs = [
        Document(content="Machine learning is a subset of artificial intelligence that focuses on algorithms that can learn from data."),
        Document(content="Deep learning uses neural networks with multiple layers to model and understand complex patterns in data."),
        Document(content="Natural language processing (NLP) enables computers to understand, interpret, and generate human language."),
        Document(content="Computer vision allows machines to identify and process visual information from the world.")
    ]

    with mlflow.start_run():
        mlflow.log_param("query", query)
        mlflow.log_param("top_k", top_k)
        mlflow.log_param("document_count", len(docs))

        # Embed and store documents
        embedder = OpenAITextEmbedder()
        embeddings_result = embedder.run(texts=[doc.content for doc in docs])

        for doc, embedding in zip(docs, embeddings_result["embeddings"]):
            doc.embedding = embedding

        document_store.write_documents(docs)

        # Build pipeline
        pipeline = Pipeline()
        pipeline.add_component("text_embedder", embedder)
        pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=top_k))
        pipeline.add_component("generator", OpenAIGenerator(model="gpt-3.5-turbo"))

        # Connect components
        pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
        pipeline.connect("retriever", "generator")

        # Execute query
        result = pipeline.run({
            "text_embedder": {"text": query},
            "generator": {
                "prompt": "Answer the question based on the given context.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
            }
        })

        # Log results
        retrieved_docs = result.get("retriever", {}).get("documents", [])
        mlflow.log_metric("retrieved_documents", len(retrieved_docs))

        if retrieved_docs:
            avg_score = sum(doc.score for doc in retrieved_docs if doc.score) / len(retrieved_docs)
            mlflow.log_metric("avg_retrieval_score", avg_score)

        answer = result.get("generator", {}).get("replies", ["No answer generated"])[0]
        mlflow.log_param("generated_answer", answer[:200])  # First 200 chars

        return result

result = build_and_run_qa_pipeline("What is machine learning?")

OpenInference

Specialized tracing for retrieval and LLM operations:

from openinference.instrumentation.haystack import HaystackInstrumentor
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

# Initialize OpenTelemetry
resource = Resource.create({"service.name": "haystack-app"})
trace.set_tracer_provider(TracerProvider(resource=resource))

otlp_exporter = OTLPSpanExporter(
    endpoint="https://api.orq.ai/v2/otel/v1/traces",
    headers={"Authorization": "Bearer <ORQ_API_KEY>"}
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

# Instrument Haystack
HaystackInstrumentor().instrument()

def advanced_rag_pipeline():
    # Setup document store with rich content
    document_store = InMemoryDocumentStore()

    documents = [
        Document(
            content="Python is a high-level programming language known for its simplicity and readability. It's widely used in web development, data science, and AI.",
            meta={"category": "programming", "difficulty": "beginner"}
        ),
        Document(
            content="JavaScript is the programming language of the web. It enables interactive web pages and is used for both frontend and backend development.",
            meta={"category": "programming", "difficulty": "beginner"}
        ),
        Document(
            content="Machine learning algorithms can be supervised, unsupervised, or reinforcement learning. Each type solves different kinds of problems.",
            meta={"category": "ai", "difficulty": "intermediate"}
        ),
        Document(
            content="Neural networks are inspired by biological neurons and consist of interconnected nodes that process information in layers.",
            meta={"category": "ai", "difficulty": "advanced"}
        )
    ]

    # Embed documents
    embedder = OpenAITextEmbedder()
    embeddings = embedder.run(texts=[doc.content for doc in documents])

    for doc, embedding in zip(documents, embeddings["embeddings"]):
        doc.embedding = embedding

    document_store.write_documents(documents)

    # Create comprehensive pipeline
    pipeline = Pipeline()
    pipeline.add_component("embedder", embedder)
    pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=2))
    pipeline.add_component("generator", OpenAIGenerator(
        model="gpt-4",
        generation_kwargs={
            "max_tokens": 200,
            "temperature": 0.3
        }
    ))

    # Connect components
    pipeline.connect("embedder.embedding", "retriever.query_embedding")
    pipeline.connect("retriever", "generator")

    # Execute queries with detailed tracking
    queries = [
        "What is Python programming language?",
        "Explain machine learning algorithms",
        "How do neural networks work?"
    ]

    results = []
    for query in queries:
        result = pipeline.run({
            "embedder": {"text": query},
            "generator": {
                "prompt": """Based on the retrieved documents, provide a comprehensive answer to the question.

Context Documents:
{% for doc in documents %}
- {{ doc.content }}
{% endfor %}

Question: {{ query }}

Please provide a detailed answer based on the context:"""
            }
        })
        results.append(result)

    return results

pipeline_results = advanced_rag_pipeline()

Manual OpenTelemetry

Fine-grained control over pipeline tracing:

from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

# Initialize OpenTelemetry
resource = Resource.create({"service.name": "haystack-app"})
trace.set_tracer_provider(TracerProvider(resource=resource))
tracer = trace.get_tracer("haystack")

# Configure OTLP exporter
otlp_exporter = OTLPSpanExporter(
    endpoint="https://api.orq.ai/v2/otel/v1/traces",
    headers={"Authorization": "Bearer <ORQ_API_KEY>"}
)
span_processor = BatchSpanProcessor(otlp_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)

def traced_document_qa_system():
    with tracer.start_as_current_span("haystack-qa-system") as main_span:
        main_span.set_attribute("system.type", "document_qa")

        # Document preparation
        with tracer.start_as_current_span("document-preparation") as prep_span:
            document_store = InMemoryDocumentStore()

            documents = [
                Document(content="Climate change refers to long-term shifts in global temperatures and weather patterns. While climate change is natural, human activities have been the main driver since the 1800s."),
                Document(content="Renewable energy comes from sources that are naturally replenished, such as sunlight, wind, rain, tides, waves, and geothermal heat."),
                Document(content="Carbon footprint is the total amount of greenhouse gases produced directly and indirectly by human activities, measured in carbon dioxide equivalent."),
                Document(content="Sustainable development meets present needs without compromising the ability of future generations to meet their own needs.")
            ]

            document_store.write_documents(documents)
            prep_span.set_attribute("documents.count", len(documents))
            prep_span.set_attribute("document_store.type", "in_memory")

        # Pipeline construction
        with tracer.start_as_current_span("pipeline-construction") as build_span:
            pipeline = Pipeline()

            # Add components
            retriever = InMemoryBM25Retriever(document_store, top_k=2)
            generator = OpenAIGenerator(
                model="gpt-4",
                generation_kwargs={"temperature": 0.2, "max_tokens": 150}
            )

            pipeline.add_component("retriever", retriever)
            pipeline.add_component("generator", generator)
            pipeline.connect("retriever", "generator")

            build_span.set_attribute("pipeline.components", 2)
            build_span.set_attribute("retriever.top_k", 2)
            build_span.set_attribute("generator.model", "gpt-4")

        # Execute multiple queries
        queries = [
            "What is climate change?",
            "How does renewable energy help the environment?",
            "What is a carbon footprint?"
        ]

        results = []
        for i, query in enumerate(queries):
            with tracer.start_as_current_span(f"query-execution-{i}") as query_span:
                query_span.set_attribute("query.text", query)
                query_span.set_attribute("query.index", i)

                try:
                    with tracer.start_as_current_span("pipeline-run") as run_span:
                        result = pipeline.run({
                            "retriever": {"query": query},
                            "generator": {
                                "prompt": "Answer the question based on the provided context.\n\nContext: {{documents}}\n\nQuestion: {{query}}\n\nAnswer:"
                            }
                        })

                        # Extract and log results
                        retrieved_docs = result.get("retriever", {}).get("documents", [])
                        generated_reply = result.get("generator", {}).get("replies", [""])[0]

                        run_span.set_attribute("retrieved_documents.count", len(retrieved_docs))
                        run_span.set_attribute("generated_reply.length", len(generated_reply))

                        if retrieved_docs:
                            avg_score = sum(doc.score for doc in retrieved_docs if hasattr(doc, 'score') and doc.score) / len(retrieved_docs)
                            run_span.set_attribute("retrieval.avg_score", avg_score)

                    query_span.set_attribute("query.success", True)
                    query_span.set_attribute("answer.preview", generated_reply[:100])
                    results.append(result)

                except Exception as e:
                    query_span.record_exception(e)
                    query_span.set_attribute("query.failed", True)
                    raise

        main_span.set_attribute("total_queries", len(queries))
        main_span.set_attribute("successful_queries", len(results))
        main_span.set_attribute("system.completed", True)

        return results

qa_results = traced_document_qa_system()

Examples

Basic RAG Pipeline

import openlit
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

# Initialize tracing
openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

def basic_rag_example():
    # Create document store and add sample documents
    document_store = InMemoryDocumentStore()

    docs = [
        Document(content="The Eiffel Tower is a wrought-iron lattice tower on the Champ de Mars in Paris, France."),
        Document(content="The Great Wall of China is a series of fortifications built across northern China."),
        Document(content="Machu Picchu is a 15th-century Inca citadel in Peru, situated on a mountain ridge."),
        Document(content="The Colosseum is an oval amphitheatre in the centre of Rome, Italy.")
    ]

    # Embed documents
    embedder = OpenAITextEmbedder()
    embeddings = embedder.run(texts=[doc.content for doc in docs])

    for doc, embedding in zip(docs, embeddings["embeddings"]):
        doc.embedding = embedding

    document_store.write_documents(docs)

    # Create RAG pipeline
    pipeline = Pipeline()
    pipeline.add_component("text_embedder", embedder)
    pipeline.add_component("retriever", InMemoryEmbeddingRetriever(document_store, top_k=2))
    pipeline.add_component("generator", OpenAIGenerator(model="gpt-3.5-turbo"))

    # Connect components
    pipeline.connect("text_embedder.embedding", "retriever.query_embedding")
    pipeline.connect("retriever", "generator")

    # Ask a question
    question = "Tell me about historical monuments in France"

    result = pipeline.run({
        "text_embedder": {"text": question},
        "generator": {
            "prompt": """Answer the question based on the context provided.

Context:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}

Question: {{ query }}
Answer:"""
        }
    })

    return result["generator"]["replies"][0]

answer = basic_rag_example()
print(f"Answer: {answer}")

Multi-Modal Document Processing

import openlit
from haystack import Pipeline, Document
from haystack.components.generators import OpenAIGenerator
from haystack.components.embedders import OpenAITextEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore

openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

def document_processing_pipeline():
    # Sample long-form documents
    documents = [
        Document(content="""
        Artificial Intelligence (AI) has revolutionized numerous industries over the past decade.
        From healthcare to finance, AI applications are transforming how we work and live.
        Machine learning, a subset of AI, enables computers to learn patterns from data without
        explicit programming. Deep learning, using neural networks, has achieved breakthrough
        results in image recognition, natural language processing, and speech recognition.

        The future of AI holds immense potential. Generative AI models can create human-like
        text, images, and even code. However, ethical considerations around bias, privacy,
        and job displacement remain important challenges to address.
        """, meta={"source": "AI_Overview.txt", "category": "technology"}),

        Document(content="""
        Climate change represents one of the most pressing challenges of our time. Global
        temperatures have risen significantly due to increased greenhouse gas emissions from
        human activities. The effects include rising sea levels, extreme weather events,
        and ecosystem disruptions.

        Renewable energy sources like solar, wind, and hydroelectric power offer sustainable
        alternatives to fossil fuels. Energy efficiency improvements and carbon capture
        technologies also play crucial roles in mitigation efforts. International cooperation
        through agreements like the Paris Climate Accord is essential for global action.
        """, meta={"source": "Climate_Report.pdf", "category": "environment"})
    ]

    # Create processing pipeline
    document_store = InMemoryDocumentStore()

    # Build indexing pipeline
    indexing_pipeline = Pipeline()
    indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=3))
    indexing_pipeline.add_component("writer", document_store.get_writer())
    indexing_pipeline.connect("splitter", "writer")

    # Process documents
    indexing_pipeline.run({"splitter": {"documents": documents}})

    # Build query pipeline
    query_pipeline = Pipeline()
    query_pipeline.add_component("retriever", InMemoryBM25Retriever(document_store, top_k=3))
    query_pipeline.add_component("generator", OpenAIGenerator(
        model="gpt-4",
        generation_kwargs={"temperature": 0.1, "max_tokens": 200}
    ))
    query_pipeline.connect("retriever", "generator")

    # Execute queries
    questions = [
        "What are the main applications of artificial intelligence?",
        "How does climate change affect the environment?",
        "What are renewable energy sources?"
    ]

    answers = []
    for question in questions:
        result = query_pipeline.run({
            "retriever": {"query": question},
            "generator": {
                "prompt": """Use the following context to answer the question comprehensively.

Context:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}

Question: {{ query }}

Provide a detailed answer based on the context:"""
            }
        })

        answers.append({
            "question": question,
            "answer": result["generator"]["replies"][0],
            "sources": [doc.meta.get("source", "Unknown") for doc in result["retriever"]["documents"]]
        })

    return answers

processed_results = document_processing_pipeline()
for result in processed_results:
    print(f"Q: {result['question']}")
    print(f"A: {result['answer'][:100]}...")
    print(f"Sources: {result['sources']}\n")

Advanced Pipeline with Custom Components

import openlit
from haystack import Pipeline, Document, component, default_from_dict, default_to_dict
from haystack.components.generators import OpenAIGenerator
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
from haystack.document_stores.in_memory import InMemoryDocumentStore
from typing import List, Dict, Any
import re

openlit.init(
    otlp_endpoint="https://api.orq.ai/v2/otel",
    otlp_headers="Authorization=Bearer <ORQ_API_KEY>"
)

@component
class QueryAnalyzer:
    """Custom component to analyze and enhance queries"""

    @component.output_types(enhanced_query=str, query_type=str, entities=List[str])
    def run(self, query: str) -> Dict[str, Any]:
        # Simple query analysis
        query_lower = query.lower()

        # Determine query type
        if any(word in query_lower for word in ["what", "define", "explain"]):
            query_type = "definition"
        elif any(word in query_lower for word in ["how", "process", "steps"]):
            query_type = "process"
        elif any(word in query_lower for word in ["compare", "difference", "versus"]):
            query_type = "comparison"
        else:
            query_type = "general"

        # Extract potential entities (simple approach)
        entities = re.findall(r'\b[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*\b', query)

        # Enhance query based on type
        if query_type == "definition":
            enhanced_query = f"Provide a comprehensive definition and explanation of: {query}"
        elif query_type == "process":
            enhanced_query = f"Describe the process and steps involved in: {query}"
        elif query_type == "comparison":
            enhanced_query = f"Compare and contrast the following: {query}"
        else:
            enhanced_query = query

        return {
            "enhanced_query": enhanced_query,
            "query_type": query_type,
            "entities": entities
        }

@component
class AnswerPostProcessor:
    """Custom component to post-process generated answers"""

    @component.output_types(processed_answer=str, confidence_score=float)
    def run(self, answer: str, query_type: str) -> Dict[str, Any]:
        # Simple post-processing based on query type
        processed_answer = answer.strip()

        # Add structure based on query type
        if query_type == "definition" and not processed_answer.startswith("Definition:"):
            processed_answer = f"Definition: {processed_answer}"
        elif query_type == "process" and "steps" not in processed_answer.lower():
            processed_answer = f"Process Overview: {processed_answer}"

        # Simple confidence scoring (in real implementation, this would be more sophisticated)
        confidence_score = 0.8 if len(processed_answer) > 100 else 0.6

        return {
            "processed_answer": processed_answer,
            "confidence_score": confidence_score
        }

def advanced_custom_pipeline():
    # Setup document store with diverse content
    document_store = InMemoryDocumentStore()

    documents = [
        Document(content="Photosynthesis is the process by which plants use sunlight, water, and carbon dioxide to produce glucose and oxygen. This process occurs in chloroplasts and involves light-dependent and light-independent reactions."),
        Document(content="Machine learning is a method of data analysis that automates analytical model building. It uses algorithms that iteratively learn from data, allowing computers to find insights without being explicitly programmed."),
        Document(content="The water cycle describes how water moves through Earth's atmosphere, land, and oceans. It includes processes like evaporation, condensation, precipitation, and collection."),
        Document(content="Renewable energy sources include solar power, wind power, hydroelectric power, and geothermal energy. These sources are sustainable and have lower environmental impact compared to fossil fuels.")
    ]

    document_store.write_documents(documents)

    # Build advanced pipeline with custom components
    pipeline = Pipeline()

    # Add components
    pipeline.add_component("query_analyzer", QueryAnalyzer())
    pipeline.add_component("retriever", InMemoryBM25Retriever(document_store, top_k=2))
    pipeline.add_component("generator", OpenAIGenerator(
        model="gpt-4",
        generation_kwargs={"temperature": 0.2, "max_tokens": 250}
    ))
    pipeline.add_component("post_processor", AnswerPostProcessor())

    # Connect components
    pipeline.connect("query_analyzer.enhanced_query", "retriever.query")
    pipeline.connect("retriever", "generator")
    pipeline.connect("generator.replies", "post_processor.answer")
    pipeline.connect("query_analyzer.query_type", "post_processor.query_type")

    # Test with various query types
    test_queries = [
        "What is photosynthesis?",
        "How does the water cycle work?",
        "Compare renewable energy and fossil fuels",
        "Explain machine learning algorithms"
    ]

    results = []
    for query in test_queries:
        result = pipeline.run({
            "query_analyzer": {"query": query},
            "generator": {
                "prompt": """Based on the provided context, answer the question accurately and comprehensively.

Context Documents:
{% for doc in documents %}
{{ doc.content }}
{% endfor %}

Enhanced Query: {{ enhanced_query }}

Please provide a detailed response:"""
            }
        })

        results.append({
            "original_query": query,
            "query_type": result["query_analyzer"]["query_type"],
            "entities": result["query_analyzer"]["entities"],
            "final_answer": result["post_processor"]["processed_answer"],
            "confidence": result["post_processor"]["confidence_score"]
        })

    return results

custom_results = advanced_custom_pipeline()
for result in custom_results:
    print(f"Query: {result['original_query']}")
    print(f"Type: {result['query_type']}")
    print(f"Entities: {result['entities']}")
    print(f"Answer: {result['final_answer'][:150]}...")
    print(f"Confidence: {result['confidence']:.2f}\n")

Next Steps

Verify traces: Check your Orq.ai dashboard to see incoming Haystack traces ✅ Monitor pipeline performance: Track component latencies and retrieval effectiveness ✅ Add custom spans: Enhance traces with domain-specific metadata ✅ Set up alerts: Configure monitoring for pipeline failures or performance degradation ✅ Analyze retrieval quality: Use trace data to optimize document chunking and retrieval parameters

Related Documentation

Support