AgnicPay

Streaming

Real-time streaming responses from the AI Gateway

Streaming Responses

Stream AI responses in real-time for a better user experience. Instead of waiting for the complete response, receive tokens as they're generated.

Why Streaming?

  • Better UX - Users see responses immediately
  • Perceived speed - App feels faster and more responsive
  • Cancel early - Stop generation if response isn't helpful
  • Memory efficient - Process tokens as they arrive

Python Streaming

Basic Streaming

from openai import OpenAI
 
client = OpenAI(
    api_key="agnic_tok_YOUR_TOKEN",
    base_url="https://api.agnic.ai/v1"
)
 
stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "Write a short poem about coding"}],
    stream=True
)
 
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="", flush=True)
 
print()  # New line at end

Async Streaming

from openai import AsyncOpenAI
import asyncio
 
client = AsyncOpenAI(
    api_key="agnic_tok_YOUR_TOKEN",
    base_url="https://api.agnic.ai/v1"
)
 
async def stream_response():
    stream = await client.chat.completions.create(
        model="openai/gpt-4o",
        messages=[{"role": "user", "content": "Explain recursion"}],
        stream=True
    )
 
    async for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="", flush=True)
 
asyncio.run(stream_response())

Collecting Full Response

from openai import OpenAI
 
client = OpenAI(
    api_key="agnic_tok_YOUR_TOKEN",
    base_url="https://api.agnic.ai/v1"
)
 
stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "Write a haiku"}],
    stream=True
)
 
full_response = ""
for chunk in stream:
    if chunk.choices[0].delta.content:
        content = chunk.choices[0].delta.content
        print(content, end="", flush=True)
        full_response += content
 
print(f"\n\nFull response: {full_response}")

JavaScript Streaming

Basic Streaming

import OpenAI from 'openai';
 
const client = new OpenAI({
  apiKey: 'agnic_tok_YOUR_TOKEN',
  baseURL: 'https://api.agnic.ai/v1'
});
 
const stream = await client.chat.completions.create({
  model: 'openai/gpt-4o',
  messages: [{ role: 'user', content: 'Write a poem about JavaScript' }],
  stream: true
});
 
for await (const chunk of stream) {
  const content = chunk.choices[0]?.delta?.content;
  if (content) {
    process.stdout.write(content);
  }
}

Browser (React Example)

import OpenAI from 'openai';
import { useState } from 'react';
 
function ChatComponent() {
  const [response, setResponse] = useState('');
  const [loading, setLoading] = useState(false);
 
  const handleSubmit = async (prompt: string) => {
    setLoading(true);
    setResponse('');
 
    const client = new OpenAI({
      apiKey: 'agnic_tok_YOUR_TOKEN',
      baseURL: 'https://api.agnic.ai/v1',
      dangerouslyAllowBrowser: true  // Only for demos!
    });
 
    const stream = await client.chat.completions.create({
      model: 'openai/gpt-4o',
      messages: [{ role: 'user', content: prompt }],
      stream: true
    });
 
    for await (const chunk of stream) {
      const content = chunk.choices[0]?.delta?.content;
      if (content) {
        setResponse(prev => prev + content);
      }
    }
 
    setLoading(false);
  };
 
  return (
    <div>
      <pre>{response}</pre>
      {loading && <span>Generating...</span>}
    </div>
  );
}

Never expose API tokens in browser code in production. Use a backend proxy instead.


cURL Streaming

curl https://api.agnic.ai/v1/chat/completions \
  -H "Authorization: Bearer agnic_tok_YOUR_TOKEN" \
  -H "Content-Type: application/json" \
  -N \
  -d '{
    "model": "openai/gpt-4o",
    "messages": [{"role": "user", "content": "Write a story"}],
    "stream": true
  }'

The -N flag disables buffering for real-time output.


Stream Event Format

Each chunk follows the Server-Sent Events (SSE) format:

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{"content":"!"},"finish_reason":null}]}

data: {"id":"chatcmpl-123","object":"chat.completion.chunk","model":"openai/gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}

data: [DONE]

Chunk Structure

{
  "id": "chatcmpl-123",
  "object": "chat.completion.chunk",
  "model": "openai/gpt-4o",
  "choices": [{
    "index": 0,
    "delta": {
      "content": "Hello"
    },
    "finish_reason": null
  }]
}

Final Chunk

The last chunk has finish_reason set and empty delta:

{
  "choices": [{
    "delta": {},
    "finish_reason": "stop"
  }]
}

Handling Stream Events

Python with Events

from openai import OpenAI
 
client = OpenAI(
    api_key="agnic_tok_YOUR_TOKEN",
    base_url="https://api.agnic.ai/v1"
)
 
stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "Count to 5"}],
    stream=True
)
 
for chunk in stream:
    choice = chunk.choices[0]
 
    # Check if content
    if choice.delta.content:
        print(f"Content: {choice.delta.content}")
 
    # Check if done
    if choice.finish_reason:
        print(f"Finished: {choice.finish_reason}")

Canceling Streams

Python

from openai import OpenAI
 
client = OpenAI(
    api_key="agnic_tok_YOUR_TOKEN",
    base_url="https://api.agnic.ai/v1"
)
 
stream = client.chat.completions.create(
    model="openai/gpt-4o",
    messages=[{"role": "user", "content": "Write a very long story"}],
    stream=True
)
 
token_count = 0
for chunk in stream:
    if chunk.choices[0].delta.content:
        print(chunk.choices[0].delta.content, end="")
        token_count += 1
 
    # Cancel after 100 tokens
    if token_count > 100:
        print("\n\n[Cancelled]")
        break

JavaScript

const controller = new AbortController();
 
const stream = await client.chat.completions.create({
  model: 'openai/gpt-4o',
  messages: [{ role: 'user', content: 'Write a long story' }],
  stream: true
}, { signal: controller.signal });
 
let tokenCount = 0;
try {
  for await (const chunk of stream) {
    if (chunk.choices[0]?.delta?.content) {
      process.stdout.write(chunk.choices[0].delta.content);
      tokenCount++;
    }
 
    if (tokenCount > 100) {
      controller.abort();
      console.log('\n\n[Cancelled]');
    }
  }
} catch (e) {
  if (e.name !== 'AbortError') throw e;
}

Error Handling in Streams

from openai import OpenAI, APIError
 
client = OpenAI(
    api_key="agnic_tok_YOUR_TOKEN",
    base_url="https://api.agnic.ai/v1"
)
 
try:
    stream = client.chat.completions.create(
        model="openai/gpt-4o",
        messages=[{"role": "user", "content": "Hello"}],
        stream=True
    )
 
    for chunk in stream:
        if chunk.choices[0].delta.content:
            print(chunk.choices[0].delta.content, end="")
 
except APIError as e:
    if e.status_code == 402:
        print("Payment required - check your balance")
    else:
        print(f"Stream error: {e}")

Best Practices

  1. Always handle partial responses - Streams can disconnect mid-response
  2. Implement timeouts - Don't wait forever for chunks
  3. Show loading state - Indicate when waiting for first chunk
  4. Buffer for display - Some UI frameworks work better with small batches
  5. Track usage - Final chunk may include token usage info

Next Steps

On this page