Observability¶
The cogent.observability module provides real-time visibility into agent execution.
Quick Start¶
Pass a level string directly to observer= — no import needed for the common case:
agent = Agent(
name="Assistant",
model="gpt-5.4-mini",
tools=[my_tool],
observer="progress", # "off" | "progress" | "debug" | "trace"
)
result = await agent.run("Do something useful")
Use Observer directly when you need subscriptions, custom sinks, a shared observer, or event capture:
from cogent.observability import Observer
observer = Observer(level="progress")
agent = Agent(name="Assistant", model="gpt-5.4-mini", observer=observer)
Output Levels¶
Each level is a strict superset of the one below it.
| Level | What you see |
|---|---|
"off" |
Nothing |
"progress" |
Agent lifecycle · tool calls and results · subagent calls · streaming start/end |
"debug" |
Progress + LLM request/response, token counts, reasoning content, trace IDs, no truncation |
"trace" |
Debug (reserved for future fine-grained instrumentation) |
Example output at "progress"¶
[Assistant] [user-input] 8552e158
Do some math.
[Assistant] [tool-decision]
calculate
[Assistant] [tool-call] a1b2c3d4 calculate
{x=6, y=7}
[Assistant] [tool-result] a1b2c3d4 calculate
42
[Assistant] [agent-completed] (2.0s) • 330 tokens
The answer is 42.
Example output at "debug"¶
[2026-04-01 12:00:00.123] [Assistant] [user-input] 8552e158
Do some math.
[2026-04-01 12:00:00.124] [Assistant] [request] gpt-5.4-mini (1 msgs) • 1 tools
[2026-04-01 12:00:00.200] [Assistant] [tool-decision]
calculate
I should call the calculate tool with x=6, y=7.
[2026-04-01 12:00:00.201] [Assistant] [tool-call] a1b2c3d4 calculate
{x=6, y=7}
[2026-04-01 12:00:00.350] [Assistant] [tool-result] a1b2c3d4 calculate (149ms)
42
[2026-04-01 12:00:00.351] [Assistant] [response] (228ms) • 330 tokens
[2026-04-01 12:00:00.352] [Assistant] [agent-completed] (228ms) • 330 tokens
The answer is 42.
MCP and A2A source labels¶
The console appends @server or @host:port directly to the tool or subagent name so the origin is visible at a glance without extra noise:
[coordinator] [subagent-call] 3acc978c analyst@localhost:10088
Calculate 15% of 340 and provide the result clearly.
[coordinator] [subagent-result] 3acc978c analyst@localhost:10088
'15% of 340 is 51.'
[researcher] [tool-call] 7f1a2b3c web_search@search
{'query': 'Python async best practices'}
[researcher] [tool-result] 7f1a2b3c web_search@search
'Use asyncio.gather for concurrent tasks...'
name@host:port— subagent backed by anA2AAgentremote endpointname@server— tool sourced from an MCP server (the server'sname=fromMCP.stdio(name=...))- No suffix — local subagent or built-in / capability tool
Post-run Event Inspection¶
Every result carries the events emitted during the run:
result = await agent.run("Do something")
# All events
result.events
# Filter by type — supports glob
errors = result.events_of("tool.error")
llm_reqs = result.events_of("llm.*")
This does not require capture configuration — events are always stored on the result.
Observer API¶
Subscribing to Events¶
from cogent.observability import Observer
observer = Observer(level="progress")
# Subscribe to a specific type
observer.on("tool.called", lambda e: print(f"tool: {e.data['tool_name']}"))
# Subscribe to a glob pattern
observer.on("tool.*", lambda e: print(f"{e.type}: {e.data}"))
# Subscribe to all events
observer.on_all(lambda e: print(e.type))
# Unsubscribe
unsub = observer.on("agent.*", handler)
unsub()
Event Capture and History¶
observer.history() only returns events that matched a capture= pattern at
construction time. Use it when you want a filtered post-run log separate from
result.events.
observer = Observer(
level="progress",
capture=["tool.result", "agent.*"],
)
await agent.run("Do something")
for event in observer.history("tool.*"):
print(event.type, event.data["tool_name"])
observer.clear_history()
Summary¶
Dynamic Configuration¶
Emitting Custom Events¶
Use your own namespace (e.g. my_app.*) rather than Cogent's built-in names.
Sharing an Observer Across Agents¶
One observer can track multiple agents. Output is tagged with the agent name.
observer = Observer(level="progress")
researcher = Agent(name="Researcher", model=..., observer=observer)
writer = Agent(name="Writer", model=..., observer=observer)
await researcher.run("Research AI trends")
await writer.run("Write summary")
print(observer.summary())
[Researcher] [user-input] abc123de
Research AI trends
[Researcher] [tool-call] abc123de search
{query='AI trends'}
[Researcher] [tool-result] abc123de search
'Latest trends in AI...'
[Researcher] [agent-completed] (2.1s) • 250 tokens
Here are the key AI trends...
[Writer] [user-input] def456gh
Write summary
[Writer] [agent-completed] (1.5s) • 180 tokens
Here is a summary...
Event Reference¶
Event Shape¶
All built-in events are immutable Event records:
| Field | Meaning |
|---|---|
type |
String name such as tool.called |
data |
Payload dictionary |
timestamp |
UTC timestamp |
source |
Emitting agent or component |
correlation_id |
Optional correlation ID |
event_id |
Unique event ID |
Built-in Event Types¶
| Event | Level | Description |
|---|---|---|
agent.invoked |
PROGRESS | Agent execution started |
agent.thinking |
PROGRESS | Thinking step / loop iteration |
agent.responded |
PROGRESS | Final response produced |
agent.error |
PROGRESS | Agent or validation failure |
tool.called |
PROGRESS | Tool invocation started |
tool.result |
PROGRESS | Tool completed successfully |
tool.retry |
PROGRESS | Tool call failed; framework will retry (one event per failed attempt) |
tool.error |
PROGRESS | All retries exhausted — error returned to caller |
tool.escalated |
PROGRESS | All retries exhausted with on_exhaustion="ask_agent" — error handed to LLM |
subagent.called |
PROGRESS | Subagent delegation started |
subagent.result |
PROGRESS | Subagent completed |
subagent.error |
PROGRESS | Subagent failed |
stream.start |
PROGRESS | Streaming started |
stream.end |
PROGRESS | Streaming completed |
stream.error |
PROGRESS | Streaming failed |
output.generated |
PROGRESS | Structured output produced |
llm.request |
DEBUG | Request sent to the model |
llm.response |
DEBUG | Model response metadata |
llm.thinking |
DEBUG | Extended reasoning/thinking tokens |
agent.reasoning |
DEBUG | Explicit reasoning phase |
agent.acting |
DEBUG | Tool execution phase |
Common Payload Fields¶
| Family | Fields |
|---|---|
agent.* |
agent_name, agent_id, run_id, step_id, iteration, duration_ms |
tool.* |
tool_name, call_id, args, result, error, attempts, tool_source |
tool.retry extra |
attempt (1-based retry number), max_retries, error_type, retry_delay |
subagent.* |
subagent_name, call_id, run_id, subagent_run_id, subagent_type ("local" or "a2a"), subagent_url (A2A only), task |
llm.* |
agent_name, model, iteration, token counts, thinking content |
stream.* |
agent_name, token or preview fields |
Run Lineage¶
Built-in events carry lineage fields for reconstructing a run:
| Field | Meaning |
|---|---|
run_id |
Stable ID for one agent invocation |
parent_run_id |
Parent invocation ID for nested/delegated runs |
step_id |
Step within a run (step-1, reasoning-2) |
tool_call_id |
Per-invocation ID on tool.* and subagent.* events |
observer = Observer(level="debug", capture=["agent.*", "subagent.*"])
agent = Agent(name="Assistant", model="gpt-5.4-mini", observer=observer)
await agent.run("Check the shipping quote")
for event in observer.history("subagent.*"):
print(event.type, event.data["run_id"], event.data["tool_call_id"])
Sinks¶
By default the observer writes to stderr. Add custom sinks to route events elsewhere:
from cogent.observability import Observer, ConsoleSink, FileSink, CallbackSink
import sys
observer = Observer(level="progress")
observer.add_sink(ConsoleSink(stream=sys.stdout)) # redirect to stdout
observer.add_sink(FileSink("agent.log")) # write to file
observer.add_sink(CallbackSink( # custom handler
lambda event, formatted: send_to_datadog(formatted)
))
Metrics¶
Collect counters, gauges, and histograms alongside event observability:
from cogent.observability import MetricsCollector
collector = MetricsCollector()
requests = collector.counter("requests_total", "Total requests")
requests.inc()
active = collector.gauge("active_agents", "Currently active")
active.set(3)
active.inc()
active.dec()
latency = collector.histogram(
"request_latency_ms",
"Request latency",
buckets=[10, 50, 100, 500, 1000],
)
latency.observe(42.5)
print(collector.to_dict())
Distributed Tracing¶
from cogent.observability import Tracer, SpanKind
tracer = Tracer(service_name="my-agent")
async with tracer.span("process_request", kind=SpanKind.SERVER) as span:
span.set_attribute("user_id", "123")
async with tracer.span("call_llm") as llm_span:
llm_span.set_attribute("model", "gpt-5.4")
response = await llm.invoke(...)