Core Module¶
The cogent.core module provides foundational types, enums, utilities, and dependency injection used throughout the framework.
Overview¶
The core module defines: - Enums for status types, event types, and roles - Native message types compatible with all LLM providers - Utility functions for IDs, timestamps, etc. - RunContext for dependency injection (tools and interceptors) - Reactive utilities for event-driven systems (idempotency, retries, delays)
from cogent.core import (
# Enums
TaskStatus,
AgentStatus,
EventType,
Priority,
AgentRole,
# Context
RunContext,
EMPTY_CONTEXT,
# Utilities
generate_id,
now_utc,
# Reactive utilities
IdempotencyGuard,
RetryBudget,
emit_later,
jittered_delay,
Stopwatch,
)
Enums¶
TaskStatus¶
Task lifecycle states:
from cogent.core import TaskStatus
status = TaskStatus.RUNNING
# Check state categories
status.is_terminal() # COMPLETED, FAILED, CANCELLED
status.is_active() # RUNNING, SPAWNING
| Status | Description |
|---|---|
PENDING |
Task created, not yet scheduled |
SCHEDULED |
Task scheduled for execution |
BLOCKED |
Task waiting on dependencies |
RUNNING |
Task actively executing |
SPAWNING |
Task creating subtasks |
COMPLETED |
Task finished successfully |
FAILED |
Task failed with error |
CANCELLED |
Task was cancelled |
AgentStatus¶
Agent lifecycle states:
from cogent.core import AgentStatus
status = AgentStatus.THINKING
# Check state categories
status.is_available() # Can accept new work (IDLE)
status.is_working() # Currently working (THINKING, ACTING)
| Status | Description |
|---|---|
IDLE |
Agent ready for new work |
THINKING |
Agent reasoning/planning |
ACTING |
Agent executing tools |
WAITING |
Agent waiting for response |
ERROR |
Agent in error state |
OFFLINE |
Agent unavailable |
EventType¶
System events for observability and coordination:
Categories:
| Category | Examples |
|---|---|
system |
SYSTEM_STARTED, SYSTEM_STOPPED, SYSTEM_ERROR |
task |
TASK_CREATED, TASK_STARTED, TASK_COMPLETED, TASK_FAILED |
subtask |
SUBTASK_SPAWNED, SUBTASK_COMPLETED |
agent |
AGENT_THINKING, AGENT_ACTING, AGENT_RESPONDED |
tool |
TOOL_CALLED, TOOL_RESULT, TOOL_ERROR |
llm |
LLM_REQUEST, LLM_RESPONSE, LLM_TOOL_DECISION |
stream |
STREAM_START, TOKEN_STREAMED, STREAM_END |
plan |
PLAN_CREATED, PLAN_STEP_STARTED, PLAN_STEP_COMPLETED |
message |
MESSAGE_SENT, MESSAGE_RECEIVED |
Priority¶
Task priority levels (comparable):
from cogent.core import Priority
# Priorities are comparable
Priority.HIGH > Priority.NORMAL # True
Priority.LOW < Priority.CRITICAL # True
| Priority | Value | Description |
|---|---|---|
LOW |
1 | Background tasks |
NORMAL |
2 | Standard priority |
HIGH |
3 | Important tasks |
CRITICAL |
4 | Urgent tasks |
AgentRole¶
Agent roles define capabilities:
from cogent.core import AgentRole
role = AgentRole.SUPERVISOR
role.can_finish # True
role.can_delegate # True
role.can_use_tools # False
| Role | can_finish | can_delegate | can_use_tools |
|---|---|---|---|
WORKER |
❌ | ❌ | ✅ |
SUPERVISOR |
✅ | ✅ | ❌ |
AUTONOMOUS |
✅ | ❌ | ✅ |
REVIEWER |
✅ | ❌ | ❌ |
Messages¶
Native message types compatible with OpenAI/Anthropic/etc. APIs:
SystemMessage¶
from cogent.core.messages import SystemMessage
msg = SystemMessage("You are a helpful assistant.")
msg.to_dict() # {"role": "system", "content": "..."}
HumanMessage¶
from cogent.core.messages import HumanMessage
msg = HumanMessage("Hello, how are you?")
msg.to_dict() # {"role": "user", "content": "..."}
AIMessage¶
from cogent.core.messages import AIMessage
# Text response
msg = AIMessage(content="I'm doing well!")
# Response with tool calls
msg = AIMessage(
content="",
tool_calls=[
{"id": "call_1", "name": "search", "args": {"query": "weather"}}
],
)
msg.to_dict() # Includes properly formatted tool_calls
ToolMessage¶
from cogent.core.messages import ToolMessage
msg = ToolMessage(
content='{"temperature": 72}',
tool_call_id="call_1",
)
Helper Functions¶
from cogent.core.messages import (
messages_to_dict,
parse_openai_response,
)
# Convert message list for API calls
messages = [SystemMessage("..."), HumanMessage("...")]
api_messages = messages_to_dict(messages)
# Parse OpenAI response into AIMessage
ai_msg = parse_openai_response(openai_response)
Context (Dependency Injection)¶
RunContext¶
Base class for invocation-scoped context that provides dependency injection for tools and interceptors.
from dataclasses import dataclass
from cogent import Agent, tool
from cogent.core import RunContext
@dataclass
class AppContext(RunContext):
user_id: str
db: Any # Your database connection
api_key: str
@tool
def get_user_data(ctx: RunContext) -> str:
"""Get data for the current user."""
user = ctx.db.get_user(ctx.user_id)
return f"User: {user.name}"
agent = Agent(name="assistant", model=model, tools=[get_user_data])
# Pass context at invocation time
result = await agent.run(
"Get my profile data",
context=AppContext(user_id="123", db=db, api_key=key),
)
Key Features:
- Type-safe context data passed to tools and interceptors
- No global state — context scoped to single invocation
- Access via ctx: RunContext parameter in tools
- Available in interceptors via InterceptContext.run_context
Methods:
- get(key, default) — Get metadata value by key
- with_metadata(**kwargs) — Create new context with additional metadata
Reactive Utilities¶
Event-driven utilities for building robust reactive systems.
IdempotencyGuard¶
In-memory idempotency guard to ensure side-effects execute only once per key:
from cogent.core import IdempotencyGuard
guard = IdempotencyGuard()
async def process_event(event_id: str, data: dict):
if not await guard.claim(event_id):
return # Already processed
# Process event exactly once
await do_work(data)
Methods:
- claim(key: str) -> bool — Atomically claim a key (returns True if first time)
- run_once(key: str, fn: Callable) -> tuple[bool, Any] — Run coroutine exactly once per key
Note: Process-local memory. For distributed systems, back with Redis/database.
RetryBudget¶
Bounded retry tracker for exponential backoff and retry policies:
from cogent.core import RetryBudget
budget = RetryBudget.in_memory(max_attempts=3)
async def handle_with_retries(task_id: str):
attempt = budget.next_attempt(task_id)
if attempt >= 3:
# Escalate to error handler
await escalate_error(task_id)
return
# Try again
await retry_task(task_id)
Methods:
- in_memory(max_attempts: int) -> RetryBudget — Create in-memory tracker
- next_attempt(key: str) -> int — Increment and return attempt count (0-based)
- can_retry(key: str) -> bool — Check if more retries available
jittered_delay¶
Calculate exponential backoff with jitter:
from cogent.core import jittered_delay
import random
attempt = 2
base_delay = 2 ** attempt # 4 seconds
jitter = random.uniform(-1, 1)
delay = jittered_delay(
base_seconds=base_delay,
jitter_seconds=jitter,
min_seconds=1.0,
max_seconds=60.0,
)
await asyncio.sleep(delay)
Stopwatch¶
Performance timing helper:
from cogent.core import Stopwatch
stopwatch = Stopwatch()
await do_work()
elapsed = stopwatch.elapsed_s
print(f"Work completed in {elapsed:.2f}s")
Utilities¶
generate_id¶
Generate unique identifiers:
from cogent.core import generate_id
task_id = generate_id() # "task_a1b2c3d4"
agent_id = generate_id(prefix="agent") # "agent_e5f6g7h8"
Timestamps¶
from cogent.core import now_utc, now_local, to_local, format_timestamp
# Get current time
utc_now = now_utc() # datetime in UTC
local_now = now_local() # datetime in local timezone
# Convert UTC to local
local_time = to_local(utc_now)
# Format for display
formatted = format_timestamp(utc_now) # "2024-12-04 10:30:45 UTC"
Response Protocol¶
New in v1.13.0 — Unified response protocol for all agent operations with consistent metadata, observability, and error handling.
Response[T]¶
Generic container for agent responses with full metadata:
from cogent import Agent
from cogent.core.response import Response
agent = Agent(name="analyst", model=model)
# Agent.run() and Agent.think() return Response[T]
response = await agent.think("Analyze sales data")
# Access response data
result: str = response.content
success: bool = response.success
# Access metadata
tokens = response.metadata.tokens.total_tokens
duration = response.metadata.duration
model = response.metadata.model
correlation_id = response.metadata.correlation_id
# Access tool calls with timing
for tool_call in response.tool_calls:
print(f"{tool_call.tool_name}: {tool_call.duration}s")
# Access conversation history
for message in response.messages:
print(f"{message.role}: {message.content}")
# Unwrap or handle errors
try:
result = response.unwrap() # Returns content or raises
except ResponseError as e:
print(f"Error: {e.response.error.message}")
Response Types¶
TokenUsage¶
Token consumption tracking with full breakdown:
from cogent.core.response import TokenUsage
tokens = response.metadata.tokens
print(f"Prompt: {tokens.prompt_tokens}")
print(f"Completion: {tokens.completion_tokens}")
print(f"Total: {tokens.total_tokens}")
# Reasoning tokens (if model supports it)
if tokens.reasoning_tokens:
print(f"Reasoning: {tokens.reasoning_tokens}")
Attributes:
- prompt_tokens: Input tokens
- completion_tokens: Output tokens
- total_tokens: Sum of prompt + completion
- reasoning_tokens: Reasoning/thinking tokens (o1, o3, deepseek-reasoner, Claude extended thinking, Gemini thinking, Grok)
Note: In multi-agent scenarios, all token categories are aggregated across coordinator and subagents.
ToolCall¶
Tool invocation tracking with timing:
from cogent.core.response import ToolCall
for call in response.tool_calls:
print(f"Tool: {call.tool_name}")
print(f"Duration: {call.duration}s")
print(f"Success: {call.success}")
if call.error:
print(f"Error: {call.error}")
ResponseMetadata¶
Consistent metadata across all responses:
from cogent.core.response import ResponseMetadata
metadata = response.metadata
# agent: Agent that generated response
# model: Model used (e.g., "gpt-4")
# tokens: Token usage (if available)
# duration: Execution time in seconds
# timestamp: Unix timestamp
# correlation_id: For distributed tracing
# trace_id: Trace identifier
ErrorInfo¶
Structured error information:
from cogent.core.response import ErrorInfo
if not response.success:
error = response.error
print(f"Type: {error.type}")
print(f"Message: {error.message}")
if error.traceback:
print(f"Traceback: {error.traceback}")
Response Features¶
Serialization¶
# Convert to dictionary
data = response.to_dict()
# {
# "content": "...",
# "success": true,
# "metadata": {...},
# "tool_calls": [...],
# "messages": [...],
# ...
# }
Event Conversion¶
from cogent.events import Event
# Create event from response
event = Event.from_response(
response,
name="analysis.done",
source="analyst",
)
# Event includes response metadata
assert event.data["content"] == response.content
assert event.metadata["tokens"]["total"] == response.metadata.tokens.total_tokens
Benefits¶
Observability:
- Full conversation history with response.messages
- Token usage tracking across all operations
- Tool call timing and success/failure tracking
- Correlation IDs for distributed tracing
Debugging: - Inspect exact prompts sent to LLM - See all tool invocations with results - Track execution timing per operation - Access full error context with tracebacks
Consistency:
- Same response format for Agent.run(), Agent.think(), and A2A
- Predictable error handling across all agent operations
- Unified metadata structure
Integration: - Seamless conversion to Events for orchestration - Works with all executor types (Native, ReAct, ChainOfThought)
Exports¶
from cogent.core import (
# Enums
TaskStatus,
AgentStatus,
EventType,
Priority,
AgentRole,
# Context
RunContext,
EMPTY_CONTEXT,
# Utilities
generate_id,
now_utc,
now_local,
to_local,
format_timestamp,
# Reactive utilities
IdempotencyGuard,
RetryBudget,
emit_later,
jittered_delay,
Stopwatch,
)
from cogent.core.messages import (
BaseMessage,
SystemMessage,
HumanMessage,
AIMessage,
ToolMessage,
messages_to_dict,
parse_openai_response,
)
from cogent.core.response import (
Response,
ResponseMetadata,
TokenUsage,
ToolCall,
ErrorInfo,
ResponseError,
)