Mastra

Integrate Orq.ai with Mastra (TypeScript) using OpenTelemetry

Getting Started

Mastra is a TypeScript framework for building AI-powered applications with pipelines, agents, and workflows. Tracing Mastra with Orq.ai provides comprehensive insights into pipeline execution, agent performance, workflow orchestration, and system reliability to optimize your AI automation workflows.

Prerequisites

Before you begin, ensure you have:

  • An Orq.ai account and API key
  • Node.js 16+ and TypeScript support
  • Mastra installed in your project
  • API keys for your LLM providers and external services

Install Dependencies

# Core Mastra framework
npm install mastra

# OpenTelemetry packages for Node.js
npm install @opentelemetry/api @opentelemetry/sdk-node @opentelemetry/exporter-trace-otlp-http

# Additional OpenTelemetry instrumentation
npm install @opentelemetry/semantic-conventions @opentelemetry/resources

# LLM providers and tools (choose what you need)
npm install openai @anthropic-ai/sdk axios

Configure Orq.ai

Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:

Unix/Linux/macOS:

export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=mastra-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"

Windows (PowerShell):

$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=mastra-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"

Using .env file:

OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=mastra-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>

Integrations

Mastra supports manual OpenTelemetry integration for comprehensive observability:

Manual OpenTelemetry Setup

Create comprehensive tracing configuration:

// lib/tracing.ts
import { NodeSDK } from "@opentelemetry/sdk-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { Resource } from "@opentelemetry/resources";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { diag, DiagConsoleLogger, DiagLogLevel, trace } from "@opentelemetry/api";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-node";

// Enable debugging (optional)
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO);

// Configure resource
const resource = new Resource({
  [SemanticResourceAttributes.SERVICE_NAME]: "mastra-app",
  [SemanticResourceAttributes.SERVICE_VERSION]: "1.0.0",
  [SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV || "development",
});

// Configure exporter
const traceExporter = new OTLPTraceExporter({
  url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT + "/v1/traces",
  headers: {
    Authorization: process.env.OTEL_EXPORTER_OTLP_HEADERS?.split("=")[1] || ""
  }
});

// Initialize SDK
const sdk = new NodeSDK({
  resource,
  traceExporter,
  spanProcessor: new BatchSpanProcessor(traceExporter),
});

// Start the SDK
sdk.start();

// Export tracer for use in application
export const tracer = trace.getTracer("mastra", "1.0.0");
export default sdk;

Examples

Basic Pipeline with Tracing

import { Mastra, Pipeline } from "mastra";
import { tracer } from "./lib/tracing";
import OpenAI from "openai";

const openai = new OpenAI();

// Initialize Mastra
const mastra = new Mastra({
  llm: {
    provider: "openai",
    model: "gpt-4",
  },
});

// Create a traced pipeline
class TracedPipeline extends Pipeline {
  async run(input: string) {
    return await tracer.startActiveSpan("mastra.pipeline.run", async (span) => {
      try {
        // Add pipeline attributes
        span.setAttributes({
          "pipeline.name": this.name,
          "pipeline.input": input.substring(0, 100),
          "pipeline.stage": "initialization",
        });

        // Process through pipeline stages
        const processed = await this.processStages(input);
        
        span.setAttributes({
          "pipeline.success": true,
          "pipeline.output.length": processed.length,
        });
        
        return processed;
      } catch (error) {
        span.recordException(error as Error);
        span.setStatus({ code: 2, message: "Pipeline failed" });
        throw error;
      } finally {
        span.end();
      }
    });
  }

  private async processStages(input: string): Promise<string> {
    // Stage 1: Preprocessing
    const preprocessed = await tracer.startActiveSpan("pipeline.stage.preprocess", async (span) => {
      span.setAttribute("stage.type", "preprocess");
      // Your preprocessing logic
      return input.toLowerCase().trim();
    });

    // Stage 2: LLM Processing
    const processed = await tracer.startActiveSpan("pipeline.stage.llm", async (span) => {
      span.setAttributes({
        "stage.type": "llm",
        "llm.model": "gpt-4",
      });
      
      const completion = await openai.chat.completions.create({
        model: "gpt-4",
        messages: [{ role: "user", content: preprocessed }],
      });
      
      return completion.choices[0].message.content || "";
    });

    return processed;
  }
}

AI Agent with Tool Usage

import { Agent, Tool } from "mastra";
import { tracer } from "./lib/tracing";
import axios from "axios";

// Define tools with tracing
class WeatherTool extends Tool {
  name = "weather";
  description = "Get current weather for a location";

  async execute(location: string) {
    return await tracer.startActiveSpan("mastra.tool.weather", async (span) => {
      span.setAttributes({
        "tool.name": this.name,
        "tool.input.location": location,
      });

      try {
        const response = await axios.get(`https://api.weather.com/v1/location/${location}`);
        span.setAttribute("tool.success", true);
        return response.data;
      } catch (error) {
        span.recordException(error as Error);
        throw error;
      }
    });
  }
}

// Create traced agent
class TracedAgent extends Agent {
  constructor() {
    super({
      name: "assistant",
      tools: [new WeatherTool()],
      systemPrompt: "You are a helpful assistant with access to weather data.",
    });
  }

  async processMessage(message: string) {
    return await tracer.startActiveSpan("mastra.agent.process", async (span) => {
      span.setAttributes({
        "agent.name": this.name,
        "agent.message": message.substring(0, 100),
        "agent.tools.count": this.tools.length,
      });

      try {
        // Process message with potential tool usage
        const response = await this.run(message);
        
        span.setAttributes({
          "agent.success": true,
          "agent.response.length": response.length,
        });
        
        return response;
      } catch (error) {
        span.recordException(error as Error);
        throw error;
      }
    });
  }
}

Complex Workflow Orchestration

import { Workflow, Step } from "mastra";
import { tracer } from "./lib/tracing";

interface WorkflowContext {
  input: string;
  enrichedData?: any;
  analysis?: string;
  output?: string;
}

// Define workflow steps with tracing
class DataEnrichmentStep extends Step<WorkflowContext> {
  async execute(context: WorkflowContext) {
    return await tracer.startActiveSpan("workflow.step.enrichment", async (span) => {
      span.setAttributes({
        "step.name": "data_enrichment",
        "step.input": context.input.substring(0, 100),
      });

      // Simulate data enrichment
      const enrichedData = {
        original: context.input,
        metadata: { timestamp: new Date().toISOString() },
        enriched: true,
      };

      context.enrichedData = enrichedData;
      span.setAttribute("step.enriched", true);
      
      return context;
    });
  }
}

class AnalysisStep extends Step<WorkflowContext> {
  async execute(context: WorkflowContext) {
    return await tracer.startActiveSpan("workflow.step.analysis", async (span) => {
      span.setAttribute("step.name", "analysis");

      // Perform analysis
      const analysis = `Analysis of: ${context.enrichedData.original}`;
      context.analysis = analysis;
      
      span.setAttribute("step.analysis.length", analysis.length);
      return context;
    });
  }
}

// Create and run workflow
class TracedWorkflow extends Workflow<WorkflowContext> {
  constructor() {
    super({
      name: "data_processing_workflow",
      steps: [
        new DataEnrichmentStep(),
        new AnalysisStep(),
      ],
    });
  }

  async run(input: string) {
    return await tracer.startActiveSpan("mastra.workflow.run", async (span) => {
      span.setAttributes({
        "workflow.name": this.name,
        "workflow.steps.count": this.steps.length,
        "workflow.input": input.substring(0, 100),
      });

      try {
        const context: WorkflowContext = { input };
        
        // Execute workflow
        const result = await super.execute(context);
        
        span.setAttributes({
          "workflow.success": true,
          "workflow.completed": true,
        });
        
        return result;
      } catch (error) {
        span.recordException(error as Error);
        span.setStatus({ code: 2, message: "Workflow failed" });
        throw error;
      }
    });
  }
}

Parallel Pipeline Execution

import { ParallelPipeline } from "mastra";
import { tracer } from "./lib/tracing";

class TracedParallelPipeline extends ParallelPipeline {
  async run(inputs: string[]) {
    return await tracer.startActiveSpan("mastra.parallel_pipeline", async (span) => {
      span.setAttributes({
        "pipeline.type": "parallel",
        "pipeline.branches": inputs.length,
      });

      try {
        // Execute pipelines in parallel
        const results = await Promise.all(
          inputs.map((input, index) =>
            this.executeBranch(input, index)
          )
        );

        span.setAttributes({
          "pipeline.success": true,
          "pipeline.results.count": results.length,
        });

        return results;
      } catch (error) {
        span.recordException(error as Error);
        throw error;
      }
    });
  }

  private async executeBranch(input: string, index: number) {
    return await tracer.startActiveSpan(`pipeline.branch.${index}`, async (span) => {
      span.setAttributes({
        "branch.index": index,
        "branch.input": input.substring(0, 50),
      });

      // Process branch
      const result = await this.processBranch(input);
      
      span.setAttribute("branch.success", true);
      return result;
    });
  }

  private async processBranch(input: string): Promise<string> {
    // Your branch processing logic
    return `Processed: ${input}`;
  }
}

Event-Driven Pipeline

import { EventDrivenPipeline, Event } from "mastra";
import { tracer } from "./lib/tracing";

class TracedEventPipeline extends EventDrivenPipeline {
  constructor() {
    super();
    this.setupEventHandlers();
  }

  private setupEventHandlers() {
    this.on("data:received", this.handleDataReceived.bind(this));
    this.on("processing:complete", this.handleProcessingComplete.bind(this));
  }

  private async handleDataReceived(event: Event) {
    await tracer.startActiveSpan("pipeline.event.data_received", async (span) => {
      span.setAttributes({
        "event.type": event.type,
        "event.timestamp": event.timestamp,
        "event.data.size": JSON.stringify(event.data).length,
      });

      // Process received data
      await this.processData(event.data);
      
      // Emit next event
      this.emit("processing:started", { ...event.data, status: "processing" });
    });
  }

  private async handleProcessingComplete(event: Event) {
    await tracer.startActiveSpan("pipeline.event.processing_complete", async (span) => {
      span.setAttributes({
        "event.type": event.type,
        "event.status": "complete",
      });

      // Final processing
      console.log("Processing complete:", event.data);
    });
  }

  private async processData(data: any) {
    // Data processing logic
    return data;
  }
}

Error Handling and Retry Logic

import { Pipeline, RetryStrategy } from "mastra";
import { tracer } from "./lib/tracing";

class ResilientPipeline extends Pipeline {
  private retryStrategy: RetryStrategy = {
    maxAttempts: 3,
    backoffMultiplier: 2,
    initialDelay: 1000,
  };

  async runWithRetry(input: string) {
    return await tracer.startActiveSpan("mastra.pipeline.resilient", async (span) => {
      let attempts = 0;
      let lastError: Error | null = null;

      while (attempts < this.retryStrategy.maxAttempts) {
        attempts++;
        
        const attemptSpan = tracer.startSpan(`pipeline.attempt.${attempts}`);
        attemptSpan.setAttributes({
          "attempt.number": attempts,
          "attempt.max": this.retryStrategy.maxAttempts,
        });

        try {
          const result = await this.execute(input);
          
          attemptSpan.setAttribute("attempt.success", true);
          span.setAttributes({
            "pipeline.attempts": attempts,
            "pipeline.success": true,
          });
          
          attemptSpan.end();
          return result;
        } catch (error) {
          lastError = error as Error;
          attemptSpan.recordException(lastError);
          attemptSpan.setAttribute("attempt.success", false);
          attemptSpan.end();

          if (attempts < this.retryStrategy.maxAttempts) {
            const delay = this.retryStrategy.initialDelay * 
              Math.pow(this.retryStrategy.backoffMultiplier, attempts - 1);
            
            span.addEvent(`Retrying after ${delay}ms`, {
              "retry.delay": delay,
              "retry.attempt": attempts,
            });
            
            await new Promise(resolve => setTimeout(resolve, delay));
          }
        }
      }

      span.setAttributes({
        "pipeline.attempts": attempts,
        "pipeline.success": false,
        "pipeline.error": lastError?.message || "Unknown error",
      });
      
      throw lastError || new Error("Pipeline failed after max retries");
    });
  }

  private async execute(input: string): Promise<string> {
    // Pipeline execution logic that might fail
    if (Math.random() > 0.7) {
      throw new Error("Random failure for demonstration");
    }
    return `Processed: ${input}`;
  }
}

Stream Processing Pipeline

import { StreamPipeline } from "mastra";
import { tracer } from "./lib/tracing";
import { Readable, Transform } from "stream";

class TracedStreamPipeline extends StreamPipeline {
  async processStream(inputStream: Readable) {
    return await tracer.startActiveSpan("mastra.pipeline.stream", async (span) => {
      let itemsProcessed = 0;
      let bytesProcessed = 0;

      span.setAttributes({
        "pipeline.type": "stream",
        "stream.started": new Date().toISOString(),
      });

      // Create transform stream with tracing
      const transformStream = new Transform({
        async transform(chunk, encoding, callback) {
          const itemSpan = tracer.startSpan("stream.item.process");
          
          try {
            itemsProcessed++;
            bytesProcessed += chunk.length;
            
            itemSpan.setAttributes({
              "item.number": itemsProcessed,
              "item.size": chunk.length,
            });

            // Process chunk
            const processed = await processChunk(chunk);
            
            itemSpan.setAttribute("item.success", true);
            callback(null, processed);
          } catch (error) {
            itemSpan.recordException(error as Error);
            callback(error as Error);
          } finally {
            itemSpan.end();
          }
        }
      });

      // Process stream
      return new Promise((resolve, reject) => {
        inputStream
          .pipe(transformStream)
          .on("finish", () => {
            span.setAttributes({
              "stream.items.processed": itemsProcessed,
              "stream.bytes.processed": bytesProcessed,
              "stream.completed": true,
            });
            resolve({ itemsProcessed, bytesProcessed });
          })
          .on("error", (error) => {
            span.recordException(error);
            reject(error);
          });
      });
    });
  }
}

async function processChunk(chunk: Buffer): Promise<Buffer> {
  // Process individual chunk
  return chunk;
}

Next Steps

Verify traces: Check your Orq.ai dashboard to see incoming traces
Add custom attributes: Enhance traces with business-specific metadata
Set up alerts: Configure monitoring for pipeline failures
Explore metrics: Use trace data for workflow optimization
Monitor performance: Track pipeline execution times and bottlenecks

Related Documentation

Support