Streaming
This page describes features extending the AI Proxy, which provides a unified API for accessing multiple AI providers. To learn more, see AI Proxy.
Quick Start
Enable real-time response streaming for better user experience.
// Basic streaming
const stream = await openai.chat.completions.create({
model: "openai/gpt-4o",
messages: [
{ role: "user", content: "Write a story about space exploration" },
],
stream: true,
});
// Process stream chunks
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
if (content) {
process.stdout.write(content);
}
}
Configuration
Parameter | Type | Required | Description |
---|---|---|---|
stream | boolean | Yes | Enable streaming responses |
All models support streaming - no additional configuration needed.
Response Format
Streaming chunks:
{
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"created": 1677652288,
"model": "openai/gpt-4o",
"choices": [
{
"index": 0,
"delta": {
"content": "Hello" // Incremental content
},
"finish_reason": null
}
]
}
Final chunk:
{
"choices": [
{
"index": 0,
"delta": {},
"finish_reason": "stop" // "stop", "length", "tool_calls"
}
]
}
Code examples
curl -X POST https://api.orq.ai/v2/proxy/chat/completions \
-H "Authorization: Bearer $ORQ_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "openai/gpt-4o",
"messages": [
{
"role": "user",
"content": "Write a detailed explanation of quantum computing"
}
],
"stream": true
}'
from openai import OpenAI
import os
openai = OpenAI(
api_key=os.environ.get("ORQ_API_KEY"),
base_url="https://api.orq.ai/v2/proxy"
)
# Create a streaming completion
stream = openai.chat.completions.create(
model="openai/gpt-4o",
messages=[
{
"role": "user",
"content": "Write a detailed explanation of quantum computing"
}
],
stream=True
)
# Process the stream
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.ORQ_API_KEY,
baseURL: "https://api.orq.ai/v2/proxy",
});
// Create a streaming completion
const stream = await openai.chat.completions.create({
model: "openai/gpt-4o",
messages: [
{
role: "user",
content: "Write a detailed explanation of quantum computing",
},
],
stream: true,
});
// Process the stream
for await (const chunk of stream) {
process.stdout.write(chunk.choices[0]?.delta?.content || "");
}
Stream Processing Patterns
Basic processing
const processStream = async (stream) => {
let fullResponse = "";
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
if (content) {
fullResponse += content;
console.log(content); // Real-time output
}
// Check for completion
if (chunk.choices[0]?.finish_reason) {
console.log(`\nStream finished: ${chunk.choices[0].finish_reason}`);
break;
}
}
return fullResponse;
};
With error handling
const robustStreamProcessing = async (stream) => {
try {
let response = "";
const timeout = setTimeout(() => {
throw new Error("Stream timeout");
}, 30000);
for await (const chunk of stream) {
clearTimeout(timeout);
if (chunk.choices[0]?.delta?.content) {
response += chunk.choices[0].delta.content;
// Update UI with new content
updateUI(chunk.choices[0].delta.content);
}
if (chunk.choices[0]?.finish_reason) {
break;
}
}
return response;
} catch (error) {
console.error("Streaming error:", error);
throw error;
}
};
Function Calling with Streaming
Stream tool calls as they're generated:
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
}
]
stream = openai.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "What's the weather in Paris?"}],
tools=tools,
stream=True
)
for chunk in stream:
# Handle tool calls
if chunk.choices[0].delta.tool_calls:
tool_call = chunk.choices[0].delta.tool_calls[0]
if tool_call.function.arguments:
print(tool_call.function.arguments, end="")
# Handle regular content
elif chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
UI Integration Examples
React hook for streaming
import { useState, useCallback } from "react";
const useStreamingChat = () => {
const [response, setResponse] = useState("");
const [isStreaming, setIsStreaming] = useState(false);
const streamChat = useCallback(async (message) => {
setIsStreaming(true);
setResponse("");
try {
const stream = await openai.chat.completions.create({
model: "openai/gpt-4o",
messages: [{ role: "user", content: message }],
stream: true,
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
if (content) {
setResponse((prev) => prev + content);
}
if (chunk.choices[0]?.finish_reason) {
setIsStreaming(false);
break;
}
}
} catch (error) {
console.error("Streaming failed:", error);
setIsStreaming(false);
}
}, []);
return { response, isStreaming, streamChat };
};
Server-Sent Events (Browser):
const streamWithSSE = (message) => {
const eventSource = new EventSource("/api/chat-stream", {
method: "POST",
body: JSON.stringify({ message }),
headers: {
"Content-Type": "application/json",
},
});
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.choices[0]?.delta?.content) {
document.getElementById("response").innerHTML +=
data.choices[0].delta.content;
}
if (data.choices[0]?.finish_reason) {
eventSource.close();
console.log("Stream complete");
}
};
eventSource.onerror = (error) => {
console.error("SSE error:", error);
eventSource.close();
};
};
Performance Optimization
Chunk buffering
class StreamBuffer {
constructor(flushInterval = 50) {
this.buffer = "";
this.flushInterval = flushInterval;
this.lastFlush = Date.now();
}
add(content) {
this.buffer += content;
// Flush periodically or when buffer is large
if (
Date.now() - this.lastFlush > this.flushInterval ||
this.buffer.length > 100
) {
this.flush();
}
}
flush() {
if (this.buffer) {
this.onFlush(this.buffer);
this.buffer = "";
this.lastFlush = Date.now();
}
}
onFlush(content) {
// Override this method
console.log(content);
}
}
Memory management
const processLargeStream = async (stream, maxMemory = 1000000) => {
let totalLength = 0;
const chunks = [];
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || "";
if (content) {
totalLength += content.length;
chunks.push(content);
// Prevent memory overflow
if (totalLength > maxMemory) {
console.warn("Stream too large, truncating");
break;
}
}
if (chunk.choices[0]?.finish_reason) {
break;
}
}
return chunks.join("");
};
Best Practices
Stream management
- Set reasonable timeouts (30-60 seconds)
- Implement proper error boundaries
- Handle network interruptions gracefully
- Provide user cancellation options
UI/UX considerations
- Show typing indicators during streaming
- Allow users to stop generation
- Buffer small chunks for smoother display
- Handle rapid updates efficiently
Error recovery example
const streamWithRetry = async (request, maxRetries = 3) => {
for (let attempt = 1; attempt <= maxRetries; attempt++) {
try {
return await processStream(request);
} catch (error) {
if (attempt === maxRetries) throw error;
console.log(`Stream attempt ${attempt} failed, retrying...`);
await new Promise((resolve) => setTimeout(resolve, 1000 * attempt));
}
}
};
Troubleshooting
Stream cuts off unexpectedly
- Check network stability
- Verify timeout settings
- Monitor for rate limiting
- Check model-specific limits
Slow streaming performance
- Optimize chunk processing
- Reduce buffer flush frequency
- Check network latency
- Consider model selection
Memory issues
- Implement chunk size limits
- Use streaming parsers
- Clear processed chunks
- Monitor memory usage
Limitations
Limitation | Impact | Workaround |
---|---|---|
Network interruption | Stream breaks | Implement reconnection logic |
Processing overhead | Slight performance cost | Optimize chunk handling |
Model variations | Different chunk sizes | Handle variable chunk lengths |
Rate limiting | Stream throttling | Implement backoff strategies |
Advanced Features
Stream with other proxy features
const advancedStream = await openai.chat.completions.create({
model: "openai/gpt-4o",
messages: [{ role: "user", content: "Explain machine learning" }],
stream: true,
orq: {
cache: { type: "exact_match", ttl: 3600 },
tracking: { name: "StreamingBot-v1" },
timeout: { call_timeout: 30000 },
},
});
Parallel streaming
const parallelStreaming = async (queries) => {
const streams = queries.map((query) =>
openai.chat.completions.create({
model: "openai/gpt-4o",
messages: [{ role: "user", content: query }],
stream: true,
}),
);
// Process all streams concurrently
return Promise.all(streams.map(processStream));
};
Updated 4 days ago