Mastra
Integrate Orq.ai with Mastra (TypeScript) using OpenTelemetry
Getting Started
Mastra is a TypeScript framework for building AI-powered applications with pipelines, agents, and workflows. Tracing Mastra with Orq.ai provides comprehensive insights into pipeline execution, agent performance, workflow orchestration, and system reliability to optimize your AI automation workflows.
Prerequisites
Before you begin, ensure you have:
- An Orq.ai account and API key
- Node.js 16+ and TypeScript support
- Mastra installed in your project
- API keys for your LLM providers and external services
Install Dependencies
# Core Mastra framework
npm install mastra
# OpenTelemetry packages for Node.js
npm install @opentelemetry/api @opentelemetry/sdk-node @opentelemetry/exporter-trace-otlp-http
# Additional OpenTelemetry instrumentation
npm install @opentelemetry/semantic-conventions @opentelemetry/resources
# LLM providers and tools (choose what you need)
npm install openai @anthropic-ai/sdk axios
Configure Orq.ai
Set up your environment variables to connect to Orq.ai's OpenTelemetry collector:
Unix/Linux/macOS:
export OTEL_EXPORTER_OTLP_ENDPOINT="https://api.orq.ai/v2/otel"
export OTEL_EXPORTER_OTLP_HEADERS="Authorization=Bearer <ORQ_API_KEY>"
export OTEL_RESOURCE_ATTRIBUTES="service.name=mastra-app,service.version=1.0.0"
export OPENAI_API_KEY="<YOUR_OPENAI_API_KEY>"
Windows (PowerShell):
$env:OTEL_EXPORTER_OTLP_ENDPOINT = "https://api.orq.ai/v2/otel"
$env:OTEL_EXPORTER_OTLP_HEADERS = "Authorization=Bearer <ORQ_API_KEY>"
$env:OTEL_RESOURCE_ATTRIBUTES = "service.name=mastra-app,service.version=1.0.0"
$env:OPENAI_API_KEY = "<YOUR_OPENAI_API_KEY>"
Using .env file:
OTEL_EXPORTER_OTLP_ENDPOINT=https://api.orq.ai/v2/otel
OTEL_EXPORTER_OTLP_HEADERS=Authorization=Bearer <ORQ_API_KEY>
OTEL_RESOURCE_ATTRIBUTES=service.name=mastra-app,service.version=1.0.0
OPENAI_API_KEY=<YOUR_OPENAI_API_KEY>
Integrations
Mastra supports manual OpenTelemetry integration for comprehensive observability:
Manual OpenTelemetry Setup
Create comprehensive tracing configuration:
// lib/tracing.ts
import { NodeSDK } from "@opentelemetry/sdk-node";
import { OTLPTraceExporter } from "@opentelemetry/exporter-trace-otlp-http";
import { Resource } from "@opentelemetry/resources";
import { SemanticResourceAttributes } from "@opentelemetry/semantic-conventions";
import { diag, DiagConsoleLogger, DiagLogLevel, trace } from "@opentelemetry/api";
import { BatchSpanProcessor } from "@opentelemetry/sdk-trace-node";
// Enable debugging (optional)
diag.setLogger(new DiagConsoleLogger(), DiagLogLevel.INFO);
// Configure resource
const resource = new Resource({
[SemanticResourceAttributes.SERVICE_NAME]: "mastra-app",
[SemanticResourceAttributes.SERVICE_VERSION]: "1.0.0",
[SemanticResourceAttributes.DEPLOYMENT_ENVIRONMENT]: process.env.NODE_ENV || "development",
});
// Configure exporter
const traceExporter = new OTLPTraceExporter({
url: process.env.OTEL_EXPORTER_OTLP_ENDPOINT + "/v1/traces",
headers: {
Authorization: process.env.OTEL_EXPORTER_OTLP_HEADERS?.split("=")[1] || ""
}
});
// Initialize SDK
const sdk = new NodeSDK({
resource,
traceExporter,
spanProcessor: new BatchSpanProcessor(traceExporter),
});
// Start the SDK
sdk.start();
// Export tracer for use in application
export const tracer = trace.getTracer("mastra", "1.0.0");
export default sdk;
Examples
Basic Pipeline with Tracing
import { Mastra, Pipeline } from "mastra";
import { tracer } from "./lib/tracing";
import OpenAI from "openai";
const openai = new OpenAI();
// Initialize Mastra
const mastra = new Mastra({
llm: {
provider: "openai",
model: "gpt-4",
},
});
// Create a traced pipeline
class TracedPipeline extends Pipeline {
async run(input: string) {
return await tracer.startActiveSpan("mastra.pipeline.run", async (span) => {
try {
// Add pipeline attributes
span.setAttributes({
"pipeline.name": this.name,
"pipeline.input": input.substring(0, 100),
"pipeline.stage": "initialization",
});
// Process through pipeline stages
const processed = await this.processStages(input);
span.setAttributes({
"pipeline.success": true,
"pipeline.output.length": processed.length,
});
return processed;
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: 2, message: "Pipeline failed" });
throw error;
} finally {
span.end();
}
});
}
private async processStages(input: string): Promise<string> {
// Stage 1: Preprocessing
const preprocessed = await tracer.startActiveSpan("pipeline.stage.preprocess", async (span) => {
span.setAttribute("stage.type", "preprocess");
// Your preprocessing logic
return input.toLowerCase().trim();
});
// Stage 2: LLM Processing
const processed = await tracer.startActiveSpan("pipeline.stage.llm", async (span) => {
span.setAttributes({
"stage.type": "llm",
"llm.model": "gpt-4",
});
const completion = await openai.chat.completions.create({
model: "gpt-4",
messages: [{ role: "user", content: preprocessed }],
});
return completion.choices[0].message.content || "";
});
return processed;
}
}
AI Agent with Tool Usage
import { Agent, Tool } from "mastra";
import { tracer } from "./lib/tracing";
import axios from "axios";
// Define tools with tracing
class WeatherTool extends Tool {
name = "weather";
description = "Get current weather for a location";
async execute(location: string) {
return await tracer.startActiveSpan("mastra.tool.weather", async (span) => {
span.setAttributes({
"tool.name": this.name,
"tool.input.location": location,
});
try {
const response = await axios.get(`https://api.weather.com/v1/location/${location}`);
span.setAttribute("tool.success", true);
return response.data;
} catch (error) {
span.recordException(error as Error);
throw error;
}
});
}
}
// Create traced agent
class TracedAgent extends Agent {
constructor() {
super({
name: "assistant",
tools: [new WeatherTool()],
systemPrompt: "You are a helpful assistant with access to weather data.",
});
}
async processMessage(message: string) {
return await tracer.startActiveSpan("mastra.agent.process", async (span) => {
span.setAttributes({
"agent.name": this.name,
"agent.message": message.substring(0, 100),
"agent.tools.count": this.tools.length,
});
try {
// Process message with potential tool usage
const response = await this.run(message);
span.setAttributes({
"agent.success": true,
"agent.response.length": response.length,
});
return response;
} catch (error) {
span.recordException(error as Error);
throw error;
}
});
}
}
Complex Workflow Orchestration
import { Workflow, Step } from "mastra";
import { tracer } from "./lib/tracing";
interface WorkflowContext {
input: string;
enrichedData?: any;
analysis?: string;
output?: string;
}
// Define workflow steps with tracing
class DataEnrichmentStep extends Step<WorkflowContext> {
async execute(context: WorkflowContext) {
return await tracer.startActiveSpan("workflow.step.enrichment", async (span) => {
span.setAttributes({
"step.name": "data_enrichment",
"step.input": context.input.substring(0, 100),
});
// Simulate data enrichment
const enrichedData = {
original: context.input,
metadata: { timestamp: new Date().toISOString() },
enriched: true,
};
context.enrichedData = enrichedData;
span.setAttribute("step.enriched", true);
return context;
});
}
}
class AnalysisStep extends Step<WorkflowContext> {
async execute(context: WorkflowContext) {
return await tracer.startActiveSpan("workflow.step.analysis", async (span) => {
span.setAttribute("step.name", "analysis");
// Perform analysis
const analysis = `Analysis of: ${context.enrichedData.original}`;
context.analysis = analysis;
span.setAttribute("step.analysis.length", analysis.length);
return context;
});
}
}
// Create and run workflow
class TracedWorkflow extends Workflow<WorkflowContext> {
constructor() {
super({
name: "data_processing_workflow",
steps: [
new DataEnrichmentStep(),
new AnalysisStep(),
],
});
}
async run(input: string) {
return await tracer.startActiveSpan("mastra.workflow.run", async (span) => {
span.setAttributes({
"workflow.name": this.name,
"workflow.steps.count": this.steps.length,
"workflow.input": input.substring(0, 100),
});
try {
const context: WorkflowContext = { input };
// Execute workflow
const result = await super.execute(context);
span.setAttributes({
"workflow.success": true,
"workflow.completed": true,
});
return result;
} catch (error) {
span.recordException(error as Error);
span.setStatus({ code: 2, message: "Workflow failed" });
throw error;
}
});
}
}
Parallel Pipeline Execution
import { ParallelPipeline } from "mastra";
import { tracer } from "./lib/tracing";
class TracedParallelPipeline extends ParallelPipeline {
async run(inputs: string[]) {
return await tracer.startActiveSpan("mastra.parallel_pipeline", async (span) => {
span.setAttributes({
"pipeline.type": "parallel",
"pipeline.branches": inputs.length,
});
try {
// Execute pipelines in parallel
const results = await Promise.all(
inputs.map((input, index) =>
this.executeBranch(input, index)
)
);
span.setAttributes({
"pipeline.success": true,
"pipeline.results.count": results.length,
});
return results;
} catch (error) {
span.recordException(error as Error);
throw error;
}
});
}
private async executeBranch(input: string, index: number) {
return await tracer.startActiveSpan(`pipeline.branch.${index}`, async (span) => {
span.setAttributes({
"branch.index": index,
"branch.input": input.substring(0, 50),
});
// Process branch
const result = await this.processBranch(input);
span.setAttribute("branch.success", true);
return result;
});
}
private async processBranch(input: string): Promise<string> {
// Your branch processing logic
return `Processed: ${input}`;
}
}
Event-Driven Pipeline
import { EventDrivenPipeline, Event } from "mastra";
import { tracer } from "./lib/tracing";
class TracedEventPipeline extends EventDrivenPipeline {
constructor() {
super();
this.setupEventHandlers();
}
private setupEventHandlers() {
this.on("data:received", this.handleDataReceived.bind(this));
this.on("processing:complete", this.handleProcessingComplete.bind(this));
}
private async handleDataReceived(event: Event) {
await tracer.startActiveSpan("pipeline.event.data_received", async (span) => {
span.setAttributes({
"event.type": event.type,
"event.timestamp": event.timestamp,
"event.data.size": JSON.stringify(event.data).length,
});
// Process received data
await this.processData(event.data);
// Emit next event
this.emit("processing:started", { ...event.data, status: "processing" });
});
}
private async handleProcessingComplete(event: Event) {
await tracer.startActiveSpan("pipeline.event.processing_complete", async (span) => {
span.setAttributes({
"event.type": event.type,
"event.status": "complete",
});
// Final processing
console.log("Processing complete:", event.data);
});
}
private async processData(data: any) {
// Data processing logic
return data;
}
}
Error Handling and Retry Logic
import { Pipeline, RetryStrategy } from "mastra";
import { tracer } from "./lib/tracing";
class ResilientPipeline extends Pipeline {
private retryStrategy: RetryStrategy = {
maxAttempts: 3,
backoffMultiplier: 2,
initialDelay: 1000,
};
async runWithRetry(input: string) {
return await tracer.startActiveSpan("mastra.pipeline.resilient", async (span) => {
let attempts = 0;
let lastError: Error | null = null;
while (attempts < this.retryStrategy.maxAttempts) {
attempts++;
const attemptSpan = tracer.startSpan(`pipeline.attempt.${attempts}`);
attemptSpan.setAttributes({
"attempt.number": attempts,
"attempt.max": this.retryStrategy.maxAttempts,
});
try {
const result = await this.execute(input);
attemptSpan.setAttribute("attempt.success", true);
span.setAttributes({
"pipeline.attempts": attempts,
"pipeline.success": true,
});
attemptSpan.end();
return result;
} catch (error) {
lastError = error as Error;
attemptSpan.recordException(lastError);
attemptSpan.setAttribute("attempt.success", false);
attemptSpan.end();
if (attempts < this.retryStrategy.maxAttempts) {
const delay = this.retryStrategy.initialDelay *
Math.pow(this.retryStrategy.backoffMultiplier, attempts - 1);
span.addEvent(`Retrying after ${delay}ms`, {
"retry.delay": delay,
"retry.attempt": attempts,
});
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
span.setAttributes({
"pipeline.attempts": attempts,
"pipeline.success": false,
"pipeline.error": lastError?.message || "Unknown error",
});
throw lastError || new Error("Pipeline failed after max retries");
});
}
private async execute(input: string): Promise<string> {
// Pipeline execution logic that might fail
if (Math.random() > 0.7) {
throw new Error("Random failure for demonstration");
}
return `Processed: ${input}`;
}
}
Stream Processing Pipeline
import { StreamPipeline } from "mastra";
import { tracer } from "./lib/tracing";
import { Readable, Transform } from "stream";
class TracedStreamPipeline extends StreamPipeline {
async processStream(inputStream: Readable) {
return await tracer.startActiveSpan("mastra.pipeline.stream", async (span) => {
let itemsProcessed = 0;
let bytesProcessed = 0;
span.setAttributes({
"pipeline.type": "stream",
"stream.started": new Date().toISOString(),
});
// Create transform stream with tracing
const transformStream = new Transform({
async transform(chunk, encoding, callback) {
const itemSpan = tracer.startSpan("stream.item.process");
try {
itemsProcessed++;
bytesProcessed += chunk.length;
itemSpan.setAttributes({
"item.number": itemsProcessed,
"item.size": chunk.length,
});
// Process chunk
const processed = await processChunk(chunk);
itemSpan.setAttribute("item.success", true);
callback(null, processed);
} catch (error) {
itemSpan.recordException(error as Error);
callback(error as Error);
} finally {
itemSpan.end();
}
}
});
// Process stream
return new Promise((resolve, reject) => {
inputStream
.pipe(transformStream)
.on("finish", () => {
span.setAttributes({
"stream.items.processed": itemsProcessed,
"stream.bytes.processed": bytesProcessed,
"stream.completed": true,
});
resolve({ itemsProcessed, bytesProcessed });
})
.on("error", (error) => {
span.recordException(error);
reject(error);
});
});
});
}
}
async function processChunk(chunk: Buffer): Promise<Buffer> {
// Process individual chunk
return chunk;
}
Next Steps
✅ Verify traces: Check your Orq.ai dashboard to see incoming traces
✅ Add custom attributes: Enhance traces with business-specific metadata
✅ Set up alerts: Configure monitoring for pipeline failures
✅ Explore metrics: Use trace data for workflow optimization
✅ Monitor performance: Track pipeline execution times and bottlenecks
Related Documentation
Support
Updated about 24 hours ago