Skip to content

Observability

The cogent.observability module provides real-time visibility into agent execution.

Quick Start

Pass a level string directly to observer= — no import needed for the common case:

agent = Agent(
    name="Assistant",
    model="gpt-5.4-mini",
    tools=[my_tool],
    observer="progress",   # "off" | "progress" | "debug" | "trace"
)

result = await agent.run("Do something useful")

Use Observer directly when you need subscriptions, custom sinks, a shared observer, or event capture:

from cogent.observability import Observer

observer = Observer(level="progress")
agent = Agent(name="Assistant", model="gpt-5.4-mini", observer=observer)

Output Levels

Each level is a strict superset of the one below it.

Level What you see
"off" Nothing
"progress" Agent lifecycle · tool calls and results · subagent calls · streaming start/end
"debug" Progress + LLM request/response, token counts, reasoning content, trace IDs, no truncation
"trace" Debug (reserved for future fine-grained instrumentation)

Example output at "progress"

[Assistant] [user-input] 8552e158
  Do some math.
[Assistant] [tool-decision]
  calculate
[Assistant] [tool-call] a1b2c3d4 calculate
  {x=6, y=7}
[Assistant] [tool-result] a1b2c3d4 calculate
  42
[Assistant] [agent-completed] (2.0s) • 330 tokens
  The answer is 42.

Example output at "debug"

[2026-04-01 12:00:00.123] [Assistant] [user-input] 8552e158
  Do some math.
[2026-04-01 12:00:00.124] [Assistant] [request] gpt-5.4-mini (1 msgs) • 1 tools
[2026-04-01 12:00:00.200] [Assistant] [tool-decision]
  calculate
  I should call the calculate tool with x=6, y=7.
[2026-04-01 12:00:00.201] [Assistant] [tool-call] a1b2c3d4 calculate
  {x=6, y=7}
[2026-04-01 12:00:00.350] [Assistant] [tool-result] a1b2c3d4 calculate (149ms)
  42
[2026-04-01 12:00:00.351] [Assistant] [response] (228ms) • 330 tokens
[2026-04-01 12:00:00.352] [Assistant] [agent-completed] (228ms) • 330 tokens
  The answer is 42.

MCP and A2A source labels

The console appends @server or @host:port directly to the tool or subagent name so the origin is visible at a glance without extra noise:

[coordinator] [subagent-call] 3acc978c analyst@localhost:10088
  Calculate 15% of 340 and provide the result clearly.
[coordinator] [subagent-result] 3acc978c analyst@localhost:10088
  '15% of 340 is 51.'

[researcher] [tool-call] 7f1a2b3c web_search@search
  {'query': 'Python async best practices'}
[researcher] [tool-result] 7f1a2b3c web_search@search
  'Use asyncio.gather for concurrent tasks...'
  • name@host:port — subagent backed by an A2AAgent remote endpoint
  • name@server — tool sourced from an MCP server (the server's name= from MCP.stdio(name=...))
  • No suffix — local subagent or built-in / capability tool

Post-run Event Inspection

Every result carries the events emitted during the run:

result = await agent.run("Do something")

# All events
result.events

# Filter by type — supports glob
errors   = result.events_of("tool.error")
llm_reqs = result.events_of("llm.*")

This does not require capture configuration — events are always stored on the result.


Observer API

Subscribing to Events

from cogent.observability import Observer

observer = Observer(level="progress")

# Subscribe to a specific type
observer.on("tool.called", lambda e: print(f"tool: {e.data['tool_name']}"))

# Subscribe to a glob pattern
observer.on("tool.*", lambda e: print(f"{e.type}: {e.data}"))

# Subscribe to all events
observer.on_all(lambda e: print(e.type))

# Unsubscribe
unsub = observer.on("agent.*", handler)
unsub()

Event Capture and History

observer.history() only returns events that matched a capture= pattern at construction time. Use it when you want a filtered post-run log separate from result.events.

observer = Observer(
    level="progress",
    capture=["tool.result", "agent.*"],
)

await agent.run("Do something")

for event in observer.history("tool.*"):
    print(event.type, event.data["tool_name"])

observer.clear_history()

Summary

print(observer.summary())
# Events: 10
#   agent: 6
#   tool: 4

Dynamic Configuration

observer.enabled = False    # pause output
observer.level = "debug"    # change level mid-run

Emitting Custom Events

observer.emit("my_app.order.placed", order_id="123", amount=49.99)

Use your own namespace (e.g. my_app.*) rather than Cogent's built-in names.


Sharing an Observer Across Agents

One observer can track multiple agents. Output is tagged with the agent name.

observer = Observer(level="progress")

researcher = Agent(name="Researcher", model=..., observer=observer)
writer     = Agent(name="Writer",     model=..., observer=observer)

await researcher.run("Research AI trends")
await writer.run("Write summary")

print(observer.summary())
[Researcher] [user-input] abc123de
  Research AI trends
[Researcher] [tool-call] abc123de search
  {query='AI trends'}
[Researcher] [tool-result] abc123de search
  'Latest trends in AI...'
[Researcher] [agent-completed] (2.1s) • 250 tokens
  Here are the key AI trends...
[Writer] [user-input] def456gh
  Write summary
[Writer] [agent-completed] (1.5s) • 180 tokens
  Here is a summary...

Event Reference

Event Shape

All built-in events are immutable Event records:

Field Meaning
type String name such as tool.called
data Payload dictionary
timestamp UTC timestamp
source Emitting agent or component
correlation_id Optional correlation ID
event_id Unique event ID

Built-in Event Types

Event Level Description
agent.invoked PROGRESS Agent execution started
agent.thinking PROGRESS Thinking step / loop iteration
agent.responded PROGRESS Final response produced
agent.error PROGRESS Agent or validation failure
tool.called PROGRESS Tool invocation started
tool.result PROGRESS Tool completed successfully
tool.retry PROGRESS Tool call failed; framework will retry (one event per failed attempt)
tool.error PROGRESS All retries exhausted — error returned to caller
tool.escalated PROGRESS All retries exhausted with on_exhaustion="ask_agent" — error handed to LLM
subagent.called PROGRESS Subagent delegation started
subagent.result PROGRESS Subagent completed
subagent.error PROGRESS Subagent failed
stream.start PROGRESS Streaming started
stream.end PROGRESS Streaming completed
stream.error PROGRESS Streaming failed
output.generated PROGRESS Structured output produced
llm.request DEBUG Request sent to the model
llm.response DEBUG Model response metadata
llm.thinking DEBUG Extended reasoning/thinking tokens
agent.reasoning DEBUG Explicit reasoning phase
agent.acting DEBUG Tool execution phase

Common Payload Fields

Family Fields
agent.* agent_name, agent_id, run_id, step_id, iteration, duration_ms
tool.* tool_name, call_id, args, result, error, attempts, tool_source
tool.retry extra attempt (1-based retry number), max_retries, error_type, retry_delay
subagent.* subagent_name, call_id, run_id, subagent_run_id, subagent_type ("local" or "a2a"), subagent_url (A2A only), task
llm.* agent_name, model, iteration, token counts, thinking content
stream.* agent_name, token or preview fields

Run Lineage

Built-in events carry lineage fields for reconstructing a run:

Field Meaning
run_id Stable ID for one agent invocation
parent_run_id Parent invocation ID for nested/delegated runs
step_id Step within a run (step-1, reasoning-2)
tool_call_id Per-invocation ID on tool.* and subagent.* events
observer = Observer(level="debug", capture=["agent.*", "subagent.*"])
agent = Agent(name="Assistant", model="gpt-5.4-mini", observer=observer)

await agent.run("Check the shipping quote")

for event in observer.history("subagent.*"):
    print(event.type, event.data["run_id"], event.data["tool_call_id"])

Sinks

By default the observer writes to stderr. Add custom sinks to route events elsewhere:

from cogent.observability import Observer, ConsoleSink, FileSink, CallbackSink
import sys

observer = Observer(level="progress")

observer.add_sink(ConsoleSink(stream=sys.stdout))   # redirect to stdout
observer.add_sink(FileSink("agent.log"))             # write to file
observer.add_sink(CallbackSink(                      # custom handler
    lambda event, formatted: send_to_datadog(formatted)
))

Metrics

Collect counters, gauges, and histograms alongside event observability:

from cogent.observability import MetricsCollector

collector = MetricsCollector()

requests = collector.counter("requests_total", "Total requests")
requests.inc()

active = collector.gauge("active_agents", "Currently active")
active.set(3)
active.inc()
active.dec()

latency = collector.histogram(
    "request_latency_ms",
    "Request latency",
    buckets=[10, 50, 100, 500, 1000],
)
latency.observe(42.5)

print(collector.to_dict())

Distributed Tracing

from cogent.observability import Tracer, SpanKind

tracer = Tracer(service_name="my-agent")

async with tracer.span("process_request", kind=SpanKind.SERVER) as span:
    span.set_attribute("user_id", "123")

    async with tracer.span("call_llm") as llm_span:
        llm_span.set_attribute("model", "gpt-5.4")
        response = await llm.invoke(...)

Span Context Propagation

from cogent.observability import SpanContext

# Incoming context from another service
incoming = SpanContext.from_header(request.headers["traceparent"])
async with tracer.start_span("handle", context=incoming) as span:
    ...