AI Gateway
Streaming
Real-time streaming responses from the AI Gateway
Streaming Responses
Stream AI responses in real-time for a better user experience. Instead of waiting for the complete response, receive tokens as they're generated.
Why Streaming?
- Better UX - Users see responses immediately
- Perceived speed - App feels faster and more responsive
- Cancel early - Stop generation if response isn't helpful
- Memory efficient - Process tokens as they arrive
Python Streaming
Basic Streaming
from openai import OpenAI
client = OpenAI(
api_key="agnic_tok_YOUR_TOKEN",
base_url="https://api.agnic.ai/v1"
)
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Write a short poem about coding"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
print() # New line at endAsync Streaming
from openai import AsyncOpenAI
import asyncio
client = AsyncOpenAI(
api_key="agnic_tok_YOUR_TOKEN",
base_url="https://api.agnic.ai/v1"
)
async def stream_response():
stream = await client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Explain recursion"}],
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
asyncio.run(stream_response())Collecting Full Response
from openai import OpenAI
client = OpenAI(
api_key="agnic_tok_YOUR_TOKEN",
base_url="https://api.agnic.ai/v1"
)
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Write a haiku"}],
stream=True
)
full_response = ""
for chunk in stream:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(content, end="", flush=True)
full_response += content
print(f"\n\nFull response: {full_response}")JavaScript Streaming
Basic Streaming
import OpenAI from 'openai';
const client = new OpenAI({
apiKey: 'agnic_tok_YOUR_TOKEN',
baseURL: 'https://api.agnic.ai/v1'
});
const stream = await client.chat.completions.create({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: 'Write a poem about JavaScript' }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}Browser (React Example)
import OpenAI from 'openai';
import { useState } from 'react';
function ChatComponent() {
const [response, setResponse] = useState('');
const [loading, setLoading] = useState(false);
const handleSubmit = async (prompt: string) => {
setLoading(true);
setResponse('');
const client = new OpenAI({
apiKey: 'agnic_tok_YOUR_TOKEN',
baseURL: 'https://api.agnic.ai/v1',
dangerouslyAllowBrowser: true // Only for demos!
});
const stream = await client.chat.completions.create({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: prompt }],
stream: true
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
setResponse(prev => prev + content);
}
}
setLoading(false);
};
return (
<div>
<pre>{response}</pre>
{loading && <span>Generating...</span>}
</div>
);
}Never expose API tokens in browser code in production. Use a backend proxy instead.
cURL Streaming
curl https://api.agnic.ai/v1/chat/completions \
-H "Authorization: Bearer agnic_tok_YOUR_TOKEN" \
-H "Content-Type: application/json" \
-N \
-d '{
"model": "openai/gpt-4o",
"messages": [{"role": "user", "content": "Write a story"}],
"stream": true
}'The -N flag disables buffering for real-time output.
Stream Event Format
Each chunk follows the Server-Sent Events (SSE) format:
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}
data: [DONE]Chunk Structure
{
"id": "chatcmpl-123",
"object": "chat.completion.chunk",
"model": "openai/gpt-4o",
"choices": [{
"index": 0,
"delta": {
"content": "Hello"
},
"finish_reason": null
}]
}Final Chunk
The last chunk has finish_reason set and empty delta:
{
"choices": [{
"delta": {},
"finish_reason": "stop"
}]
}Handling Stream Events
Python with Events
from openai import OpenAI
client = OpenAI(
api_key="agnic_tok_YOUR_TOKEN",
base_url="https://api.agnic.ai/v1"
)
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Count to 5"}],
stream=True
)
for chunk in stream:
choice = chunk.choices[0]
# Check if content
if choice.delta.content:
print(f"Content: {choice.delta.content}")
# Check if done
if choice.finish_reason:
print(f"Finished: {choice.finish_reason}")Canceling Streams
Python
from openai import OpenAI
client = OpenAI(
api_key="agnic_tok_YOUR_TOKEN",
base_url="https://api.agnic.ai/v1"
)
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Write a very long story"}],
stream=True
)
token_count = 0
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
token_count += 1
# Cancel after 100 tokens
if token_count > 100:
print("\n\n[Cancelled]")
breakJavaScript
const controller = new AbortController();
const stream = await client.chat.completions.create({
model: 'openai/gpt-4o',
messages: [{ role: 'user', content: 'Write a long story' }],
stream: true
}, { signal: controller.signal });
let tokenCount = 0;
try {
for await (const chunk of stream) {
if (chunk.choices[0]?.delta?.content) {
process.stdout.write(chunk.choices[0].delta.content);
tokenCount++;
}
if (tokenCount > 100) {
controller.abort();
console.log('\n\n[Cancelled]');
}
}
} catch (e) {
if (e.name !== 'AbortError') throw e;
}Error Handling in Streams
from openai import OpenAI, APIError
client = OpenAI(
api_key="agnic_tok_YOUR_TOKEN",
base_url="https://api.agnic.ai/v1"
)
try:
stream = client.chat.completions.create(
model="openai/gpt-4o",
messages=[{"role": "user", "content": "Hello"}],
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
except APIError as e:
if e.status_code == 402:
print("Insufficient balance - check your balance")
else:
print(f"Stream error: {e}")Best Practices
- Always handle partial responses - Streams can disconnect mid-response
- Implement timeouts - Don't wait forever for chunks
- Show loading state - Indicate when waiting for first chunk
- Buffer for display - Some UI frameworks work better with small batches
- Track usage - Final chunk may include token usage info