Streaming¶
Real-time token-by-token streaming from agent executions.
Overview¶
Streaming enables agents to yield output progressively as tokens are generated, rather than waiting for complete responses. This provides:
- Real-time feedback during long-running agent operations
- Better UX with progressive output display
- Lower perceived latency by showing immediate progress
- Cancellation support for in-flight operations
Quick Start¶
from cogent import Agent
agent = Agent(
name="writer",
model="gpt-5.4",
stream=True, # Enable streaming
)
# Stream tokens as they arrive
async for chunk in agent.run_stream("Write a poem about AI"):
print(chunk.content, end="", flush=True)
Agent Streaming¶
Enabling Streaming¶
Two ways to enable streaming:
# Option 1: Set on agent creation
agent = Agent(
name="writer",
model="gpt-5.4",
stream=True,
)
result = await agent.run("Write about AI") # Still works normally
async for chunk in agent.run_stream("Write a poem"): # Stream tokens
print(chunk.content, end="", flush=True)
# Option 2: Stream on demand (any agent)
agent = Agent(name="writer", model="gpt-5.4")
async for chunk in agent.run_stream("Write a poem"):
print(chunk.content, end="", flush=True)
StreamChunk¶
Each streaming chunk contains:
@dataclass
class StreamChunk:
content: str # Token content
delta: str # Incremental text (same as content)
is_final: bool # Last chunk?
finish_reason: str | None # Why stopped (stop, length, tool_calls)
metadata: dict | None # Token usage, model info
Key Properties:
- content — The token text
- delta — Incremental text (alias for content)
- is_final — True when streaming completes
- finish_reason — "stop" (complete), "length" (max tokens), "tool_calls" (tool invocation)
- metadata — Includes token usage when available
Streaming with Metadata¶
Access model metadata during streaming:
async for chunk in agent.run_stream("Analyze data"):
print(chunk.content, end="")
if chunk.is_final and chunk.metadata:
print(f"\nModel: {chunk.metadata.get('model')}")
print(f"Tokens: {chunk.metadata.get('tokens')}")
run_stream() vs run()¶
| Feature | run() |
run_stream() |
|---|---|---|
| Returns | Response |
AsyncIterator[StreamChunk] |
| Output | Complete final output | Progressive tokens |
| Latency | Wait for completion | Immediate feedback |
| Use Case | Batch processing | Interactive UX |
When to use streaming: - Interactive applications (CLIs, web UIs, chatbots) - Long-running agent operations - Progress tracking and status updates - User experience is priority
When to use regular run(): - Batch processing or automation - Final result is all that matters - Simpler code (no async iteration)
Usage Patterns¶
Basic Streaming¶
from cogent import Agent
agent = Agent(name="assistant", model="gpt-5.4")
# Stream tokens as they arrive
async for chunk in agent.run_stream("Explain quantum computing"):
print(chunk.content, end="", flush=True)
print() # Newline at end
Collecting Streamed Output¶
full_response = []
async for chunk in agent.run_stream("Write a story"):
full_response.append(chunk.content)
print(chunk.content, end="", flush=True)
final_text = "".join(full_response)
Progress Tracking¶
async for chunk in agent.run_stream("Long analysis task"):
print(chunk.content, end="", flush=True)
if chunk.is_final:
print("\n✅ Complete!")
Error Handling¶
try:
async for chunk in agent.run_stream("Query"):
print(chunk.content, end="", flush=True)
if chunk.finish_reason == "length":
print("\n⚠️ Response truncated (max tokens reached)")
except Exception as e:
print(f"\n❌ Streaming error: {e}")
Streaming with Tools¶
When an agent calls tools during streaming, the stream may pause while tools execute:
from cogent import Agent, tool
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
agent = Agent(name="researcher", model="gpt-5.4", tools=[search])
async for chunk in agent.run_stream("Search for AI news and summarize"):
if chunk.content:
print(chunk.content, end="", flush=True)
if chunk.finish_reason == "tool_calls":
print("\n[Tool calling...]")
Web UI Integration¶
FastAPI Streaming Endpoint¶
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.post("/chat/stream")
async def chat_stream(message: str):
async def generate():
async for chunk in agent.run_stream(message):
yield chunk.content
return StreamingResponse(generate(), media_type="text/plain")
Server-Sent Events (SSE)¶
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
@app.post("/chat/sse")
async def chat_sse(message: str):
async def event_stream():
async for chunk in agent.run_stream(message):
yield {"event": "token", "data": chunk.content}
yield {"event": "done", "data": ""}
return EventSourceResponse(event_stream())
Model Streaming Support¶
All major model providers support streaming:
| Provider | Streaming Support |
|---|---|
| OpenAI | ✅ Full support |
| Anthropic | ✅ Full support |
| Gemini | ✅ Full support |
| Groq | ✅ Full support |
| Ollama | ✅ Full support |
| Azure OpenAI | ✅ Full support |
API Reference¶
Agent.run_stream()¶
async def run_stream(
self,
message: str,
*,
context: dict | None = None,
) -> AsyncIterator[StreamChunk]:
"""
Stream agent response token-by-token.
Args:
message: The user message to process.
context: Optional context dictionary.
Yields:
StreamChunk objects with progressive tokens.
"""
StreamChunk¶
@dataclass
class StreamChunk:
content: str # Token content
delta: str # Alias for content
is_final: bool # True on last chunk
finish_reason: str | None # "stop", "length", "tool_calls"
metadata: dict | None # Model metadata
Best Practices¶
- Use
end=""andflush=True— Ensure tokens display immediately - Handle
is_final— Add newline or summary after completion - Check
finish_reason— Detect truncation or tool calls - Collect output — Append chunks for final text if needed
- Error handling — Wrap iteration in try/except for robustness