Streaming

📖

This page describes features extending the AI Proxy, which provides a unified API for accessing multiple AI providers. To learn more, see AI Proxy.

Quick Start

Enable real-time response streaming for better user experience.

// Basic streaming
const stream = await openai.chat.completions.create({
  model: "openai/gpt-4o",
  messages: [
    { role: "user", content: "Write a story about space exploration" },
  ],
  stream: true,
});

// Process stream chunks
for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content || "";
  if (content) {
    process.stdout.write(content);
  }
}

Configuration

ParameterTypeRequiredDescription
streambooleanYesEnable streaming responses

All models support streaming - no additional configuration needed.

Response Format

Streaming chunks:

{
  "id": "chatcmpl-123",
  "object": "chat.completion.chunk",
  "created": 1677652288,
  "model": "openai/gpt-4o",
  "choices": [
    {
      "index": 0,
      "delta": {
        "content": "Hello" // Incremental content
      },
      "finish_reason": null
    }
  ]
}

Final chunk:

{
  "choices": [
    {
      "index": 0,
      "delta": {},
      "finish_reason": "stop" // "stop", "length", "tool_calls"
    }
  ]
}

Code examples

curl -X POST https://api.orq.ai/v2/proxy/chat/completions \
  -H "Authorization: Bearer $ORQ_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "model": "openai/gpt-4o",
    "messages": [
      {
        "role": "user",
        "content": "Write a detailed explanation of quantum computing"
      }
    ],
    "stream": true
  }'
from openai import OpenAI
import os

openai = OpenAI(
  api_key=os.environ.get("ORQ_API_KEY"),
  base_url="https://api.orq.ai/v2/proxy"
)

# Create a streaming completion
stream = openai.chat.completions.create(
    model="openai/gpt-4o",
    messages=[
        {
            "role": "user",
            "content": "Write a detailed explanation of quantum computing"
        }
    ],
    stream=True
)

# Process the stream
for chunk in stream:
    if chunk.choices[0].delta.content is not None:
        print(chunk.choices[0].delta.content, end="")
import OpenAI from "openai";

const openai = new OpenAI({
  apiKey: process.env.ORQ_API_KEY,
  baseURL: "https://api.orq.ai/v2/proxy",
});

// Create a streaming completion
const stream = await openai.chat.completions.create({
  model: "openai/gpt-4o",
  messages: [
    {
      role: "user",
      content: "Write a detailed explanation of quantum computing",
    },
  ],
  stream: true,
});

// Process the stream
for await (const chunk of stream) {
  process.stdout.write(chunk.choices[0]?.delta?.content || "");
}

Stream Processing Patterns

Basic processing

const processStream = async (stream) => {
  let fullResponse = "";

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || "";
    if (content) {
      fullResponse += content;
      console.log(content); // Real-time output
    }

    // Check for completion
    if (chunk.choices[0]?.finish_reason) {
      console.log(`\nStream finished: ${chunk.choices[0].finish_reason}`);
      break;
    }
  }

  return fullResponse;
};

With error handling

const robustStreamProcessing = async (stream) => {
  try {
    let response = "";
    const timeout = setTimeout(() => {
      throw new Error("Stream timeout");
    }, 30000);

    for await (const chunk of stream) {
      clearTimeout(timeout);

      if (chunk.choices[0]?.delta?.content) {
        response += chunk.choices[0].delta.content;
        // Update UI with new content
        updateUI(chunk.choices[0].delta.content);
      }

      if (chunk.choices[0]?.finish_reason) {
        break;
      }
    }

    return response;
  } catch (error) {
    console.error("Streaming error:", error);
    throw error;
  }
};

Function Calling with Streaming

Stream tool calls as they're generated:

tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {"type": "string"}
                },
                "required": ["location"]
            }
        }
    }
]

stream = openai.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "What's the weather in Paris?"}],
    tools=tools,
    stream=True
)

for chunk in stream:
    # Handle tool calls
    if chunk.choices[0].delta.tool_calls:
        tool_call = chunk.choices[0].delta.tool_calls[0]
        if tool_call.function.arguments:
            print(tool_call.function.arguments, end="")

    # Handle regular content
    elif chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")

UI Integration Examples

React hook for streaming

import { useState, useCallback } from "react";

const useStreamingChat = () => {
  const [response, setResponse] = useState("");
  const [isStreaming, setIsStreaming] = useState(false);

  const streamChat = useCallback(async (message) => {
    setIsStreaming(true);
    setResponse("");

    try {
      const stream = await openai.chat.completions.create({
        model: "openai/gpt-4o",
        messages: [{ role: "user", content: message }],
        stream: true,
      });

      for await (const chunk of stream) {
        const content = chunk.choices[0]?.delta?.content || "";
        if (content) {
          setResponse((prev) => prev + content);
        }

        if (chunk.choices[0]?.finish_reason) {
          setIsStreaming(false);
          break;
        }
      }
    } catch (error) {
      console.error("Streaming failed:", error);
      setIsStreaming(false);
    }
  }, []);

  return { response, isStreaming, streamChat };
};

Server-Sent Events (Browser):

const streamWithSSE = (message) => {
  const eventSource = new EventSource("/api/chat-stream", {
    method: "POST",
    body: JSON.stringify({ message }),
    headers: {
      "Content-Type": "application/json",
    },
  });

  eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);

    if (data.choices[0]?.delta?.content) {
      document.getElementById("response").innerHTML +=
        data.choices[0].delta.content;
    }

    if (data.choices[0]?.finish_reason) {
      eventSource.close();
      console.log("Stream complete");
    }
  };

  eventSource.onerror = (error) => {
    console.error("SSE error:", error);
    eventSource.close();
  };
};

Performance Optimization

Chunk buffering

class StreamBuffer {
  constructor(flushInterval = 50) {
    this.buffer = "";
    this.flushInterval = flushInterval;
    this.lastFlush = Date.now();
  }

  add(content) {
    this.buffer += content;

    // Flush periodically or when buffer is large
    if (
      Date.now() - this.lastFlush > this.flushInterval ||
      this.buffer.length > 100
    ) {
      this.flush();
    }
  }

  flush() {
    if (this.buffer) {
      this.onFlush(this.buffer);
      this.buffer = "";
      this.lastFlush = Date.now();
    }
  }

  onFlush(content) {
    // Override this method
    console.log(content);
  }
}

Memory management

const processLargeStream = async (stream, maxMemory = 1000000) => {
  let totalLength = 0;
  const chunks = [];

  for await (const chunk of stream) {
    const content = chunk.choices[0]?.delta?.content || "";

    if (content) {
      totalLength += content.length;
      chunks.push(content);

      // Prevent memory overflow
      if (totalLength > maxMemory) {
        console.warn("Stream too large, truncating");
        break;
      }
    }

    if (chunk.choices[0]?.finish_reason) {
      break;
    }
  }

  return chunks.join("");
};

Best Practices

Stream management

  • Set reasonable timeouts (30-60 seconds)
  • Implement proper error boundaries
  • Handle network interruptions gracefully
  • Provide user cancellation options

UI/UX considerations

  • Show typing indicators during streaming
  • Allow users to stop generation
  • Buffer small chunks for smoother display
  • Handle rapid updates efficiently

Error recovery example

const streamWithRetry = async (request, maxRetries = 3) => {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await processStream(request);
    } catch (error) {
      if (attempt === maxRetries) throw error;

      console.log(`Stream attempt ${attempt} failed, retrying...`);
      await new Promise((resolve) => setTimeout(resolve, 1000 * attempt));
    }
  }
};

Troubleshooting

Stream cuts off unexpectedly
  • Check network stability
  • Verify timeout settings
  • Monitor for rate limiting
  • Check model-specific limits
Slow streaming performance
  • Optimize chunk processing
  • Reduce buffer flush frequency
  • Check network latency
  • Consider model selection
Memory issues
  • Implement chunk size limits
  • Use streaming parsers
  • Clear processed chunks
  • Monitor memory usage

Limitations

LimitationImpactWorkaround
Network interruptionStream breaksImplement reconnection logic
Processing overheadSlight performance costOptimize chunk handling
Model variationsDifferent chunk sizesHandle variable chunk lengths
Rate limitingStream throttlingImplement backoff strategies

Advanced Features

Stream with other proxy features

const advancedStream = await openai.chat.completions.create({
  model: "openai/gpt-4o",
  messages: [{ role: "user", content: "Explain machine learning" }],
  stream: true,
  orq: {
    cache: { type: "exact_match", ttl: 3600 },
    tracking: { name: "StreamingBot-v1" },
    timeout: { call_timeout: 30000 },
  },
});

Parallel streaming

const parallelStreaming = async (queries) => {
  const streams = queries.map((query) =>
    openai.chat.completions.create({
      model: "openai/gpt-4o",
      messages: [{ role: "user", content: query }],
      stream: true,
    }),
  );

  // Process all streams concurrently
  return Promise.all(streams.map(processStream));
};