Skip to content

Core Module

The cogent.core module provides foundational types, enums, utilities, and dependency injection used throughout the framework.

Overview

The core module defines: - Enums for status types, event types, and roles - Native message types compatible with all LLM providers - Utility functions for IDs, timestamps, etc. - RunContext for dependency injection (tools and interceptors) - Reactive utilities for event-driven systems (idempotency, retries, delays)

from cogent.core import (
    # Enums
    TaskStatus,
    AgentStatus,
    EventType,
    Priority,
    AgentRole,
    # Context
    RunContext,
    EMPTY_CONTEXT,
    # Utilities
    generate_id,
    now_utc,
    # Reactive utilities
    IdempotencyGuard,
    RetryBudget,
    emit_later,
    jittered_delay,
    Stopwatch,
)

Enums

TaskStatus

Task lifecycle states:

from cogent.core import TaskStatus

status = TaskStatus.RUNNING

# Check state categories
status.is_terminal()  # COMPLETED, FAILED, CANCELLED
status.is_active()    # RUNNING, SPAWNING
Status Description
PENDING Task created, not yet scheduled
SCHEDULED Task scheduled for execution
BLOCKED Task waiting on dependencies
RUNNING Task actively executing
SPAWNING Task creating subtasks
COMPLETED Task finished successfully
FAILED Task failed with error
CANCELLED Task was cancelled

AgentStatus

Agent lifecycle states:

from cogent.core import AgentStatus

status = AgentStatus.THINKING

# Check state categories
status.is_available()  # Can accept new work (IDLE)
status.is_working()    # Currently working (THINKING, ACTING)
Status Description
IDLE Agent ready for new work
THINKING Agent reasoning/planning
ACTING Agent executing tools
WAITING Agent waiting for response
ERROR Agent in error state
OFFLINE Agent unavailable

EventType

System events for observability and coordination:

from cogent.core import EventType

event = EventType.TASK_COMPLETED
event.category  # "task"

Categories:

Category Examples
system SYSTEM_STARTED, SYSTEM_STOPPED, SYSTEM_ERROR
task TASK_CREATED, TASK_STARTED, TASK_COMPLETED, TASK_FAILED
subtask SUBTASK_SPAWNED, SUBTASK_COMPLETED
agent AGENT_THINKING, AGENT_ACTING, AGENT_RESPONDED
tool TOOL_CALLED, TOOL_RESULT, TOOL_ERROR
llm LLM_REQUEST, LLM_RESPONSE, LLM_TOOL_DECISION
stream STREAM_START, TOKEN_STREAMED, STREAM_END
plan PLAN_CREATED, PLAN_STEP_STARTED, PLAN_STEP_COMPLETED
message MESSAGE_SENT, MESSAGE_RECEIVED

Priority

Task priority levels (comparable):

from cogent.core import Priority

# Priorities are comparable
Priority.HIGH > Priority.NORMAL  # True
Priority.LOW < Priority.CRITICAL  # True
Priority Value Description
LOW 1 Background tasks
NORMAL 2 Standard priority
HIGH 3 Important tasks
CRITICAL 4 Urgent tasks

AgentRole

Agent roles define capabilities:

from cogent.core import AgentRole

role = AgentRole.SUPERVISOR
role.can_finish     # True
role.can_delegate   # True
role.can_use_tools  # False
Role can_finish can_delegate can_use_tools
WORKER
SUPERVISOR
AUTONOMOUS
REVIEWER

Messages

Native message types compatible with OpenAI/Anthropic/etc. APIs:

SystemMessage

from cogent.core.messages import SystemMessage

msg = SystemMessage("You are a helpful assistant.")
msg.to_dict()  # {"role": "system", "content": "..."}

HumanMessage

from cogent.core.messages import HumanMessage

msg = HumanMessage("Hello, how are you?")
msg.to_dict()  # {"role": "user", "content": "..."}

AIMessage

from cogent.core.messages import AIMessage

# Text response
msg = AIMessage(content="I'm doing well!")

# Response with tool calls
msg = AIMessage(
    content="",
    tool_calls=[
        {"id": "call_1", "name": "search", "args": {"query": "weather"}}
    ],
)
msg.to_dict()  # Includes properly formatted tool_calls

ToolMessage

from cogent.core.messages import ToolMessage

msg = ToolMessage(
    content='{"temperature": 72}',
    tool_call_id="call_1",
)

Helper Functions

from cogent.core.messages import (
    messages_to_dict,
    parse_openai_response,
)

# Convert message list for API calls
messages = [SystemMessage("..."), HumanMessage("...")]
api_messages = messages_to_dict(messages)

# Parse OpenAI response into AIMessage
ai_msg = parse_openai_response(openai_response)

Context (Dependency Injection)

RunContext

Base class for invocation-scoped context that provides dependency injection for tools and interceptors.

from dataclasses import dataclass
from cogent import Agent, tool
from cogent.core import RunContext

@dataclass
class AppContext(RunContext):
    user_id: str
    db: Any  # Your database connection
    api_key: str

@tool
def get_user_data(ctx: RunContext) -> str:
    """Get data for the current user."""
    user = ctx.db.get_user(ctx.user_id)
    return f"User: {user.name}"

agent = Agent(name="assistant", model=model, tools=[get_user_data])

# Pass context at invocation time
result = await agent.run(
    "Get my profile data",
    context=AppContext(user_id="123", db=db, api_key=key),
)

Key Features: - Type-safe context data passed to tools and interceptors - No global state — context scoped to single invocation - Access via ctx: RunContext parameter in tools - Available in interceptors via InterceptContext.run_context

Methods: - get(key, default) — Get metadata value by key - with_metadata(**kwargs) — Create new context with additional metadata


Reactive Utilities

Event-driven utilities for building robust reactive systems.

IdempotencyGuard

In-memory idempotency guard to ensure side-effects execute only once per key:

from cogent.core import IdempotencyGuard

guard = IdempotencyGuard()

async def process_event(event_id: str, data: dict):
    if not await guard.claim(event_id):
        return  # Already processed

    # Process event exactly once
    await do_work(data)

Methods: - claim(key: str) -> bool — Atomically claim a key (returns True if first time) - run_once(key: str, fn: Callable) -> tuple[bool, Any] — Run coroutine exactly once per key

Note: Process-local memory. For distributed systems, back with Redis/database.

RetryBudget

Bounded retry tracker for exponential backoff and retry policies:

from cogent.core import RetryBudget

budget = RetryBudget.in_memory(max_attempts=3)

async def handle_with_retries(task_id: str):
    attempt = budget.next_attempt(task_id)

    if attempt >= 3:
        # Escalate to error handler
        await escalate_error(task_id)
        return

    # Try again
    await retry_task(task_id)

Methods: - in_memory(max_attempts: int) -> RetryBudget — Create in-memory tracker - next_attempt(key: str) -> int — Increment and return attempt count (0-based) - can_retry(key: str) -> bool — Check if more retries available

jittered_delay

Calculate exponential backoff with jitter:

from cogent.core import jittered_delay
import random

attempt = 2
base_delay = 2 ** attempt  # 4 seconds
jitter = random.uniform(-1, 1)

delay = jittered_delay(
    base_seconds=base_delay,
    jitter_seconds=jitter,
    min_seconds=1.0,
    max_seconds=60.0,
)

await asyncio.sleep(delay)

Stopwatch

Performance timing helper:

from cogent.core import Stopwatch

stopwatch = Stopwatch()

await do_work()

elapsed = stopwatch.elapsed_s
print(f"Work completed in {elapsed:.2f}s")

Utilities

generate_id

Generate unique identifiers:

from cogent.core import generate_id

task_id = generate_id()  # "task_a1b2c3d4"
agent_id = generate_id(prefix="agent")  # "agent_e5f6g7h8"

Timestamps

from cogent.core import now_utc, now_local, to_local, format_timestamp

# Get current time
utc_now = now_utc()      # datetime in UTC
local_now = now_local()  # datetime in local timezone

# Convert UTC to local
local_time = to_local(utc_now)

# Format for display
formatted = format_timestamp(utc_now)  # "2024-12-04 10:30:45 UTC"

Response Protocol

New in v1.13.0 — Unified response protocol for all agent operations with consistent metadata, observability, and error handling.

Response[T]

Generic container for agent responses with full metadata:

from cogent import Agent
from cogent.core.response import Response

agent = Agent(name="analyst", model=model)

# Agent.run() and Agent.think() return Response[T]
response = await agent.think("Analyze sales data")

# Access response data
result: str = response.content
success: bool = response.success

# Access metadata
tokens = response.metadata.tokens.total_tokens
duration = response.metadata.duration
model = response.metadata.model
correlation_id = response.metadata.correlation_id

# Access tool calls with timing
for tool_call in response.tool_calls:
    print(f"{tool_call.tool_name}: {tool_call.duration}s")

# Access conversation history
for message in response.messages:
    print(f"{message.role}: {message.content}")

# Unwrap or handle errors
try:
    result = response.unwrap()  # Returns content or raises
except ResponseError as e:
    print(f"Error: {e.response.error.message}")

Response Types

TokenUsage

Token consumption tracking with full breakdown:

from cogent.core.response import TokenUsage

tokens = response.metadata.tokens
print(f"Prompt: {tokens.prompt_tokens}")
print(f"Completion: {tokens.completion_tokens}")
print(f"Total: {tokens.total_tokens}")

# Reasoning tokens (if model supports it)
if tokens.reasoning_tokens:
    print(f"Reasoning: {tokens.reasoning_tokens}")

Attributes: - prompt_tokens: Input tokens - completion_tokens: Output tokens
- total_tokens: Sum of prompt + completion - reasoning_tokens: Reasoning/thinking tokens (o1, o3, deepseek-reasoner, Claude extended thinking, Gemini thinking, Grok)

Note: In multi-agent scenarios, all token categories are aggregated across coordinator and subagents.

ToolCall

Tool invocation tracking with timing:

from cogent.core.response import ToolCall

for call in response.tool_calls:
    print(f"Tool: {call.tool_name}")
    print(f"Duration: {call.duration}s")
    print(f"Success: {call.success}")
    if call.error:
        print(f"Error: {call.error}")

ResponseMetadata

Consistent metadata across all responses:

from cogent.core.response import ResponseMetadata

metadata = response.metadata
# agent: Agent that generated response
# model: Model used (e.g., "gpt-4")
# tokens: Token usage (if available)
# duration: Execution time in seconds
# timestamp: Unix timestamp
# correlation_id: For distributed tracing
# trace_id: Trace identifier

ErrorInfo

Structured error information:

from cogent.core.response import ErrorInfo

if not response.success:
    error = response.error
    print(f"Type: {error.type}")
    print(f"Message: {error.message}")
    if error.traceback:
        print(f"Traceback: {error.traceback}")

Response Features

Serialization

# Convert to dictionary
data = response.to_dict()
# {
#     "content": "...",
#     "success": true,
#     "metadata": {...},
#     "tool_calls": [...],
#     "messages": [...],
#     ...
# }

Event Conversion

from cogent.events import Event

# Create event from response
event = Event.from_response(
    response,
    name="analysis.done",
    source="analyst",
)

# Event includes response metadata
assert event.data["content"] == response.content
assert event.metadata["tokens"]["total"] == response.metadata.tokens.total_tokens

Benefits

Observability: - Full conversation history with response.messages - Token usage tracking across all operations - Tool call timing and success/failure tracking - Correlation IDs for distributed tracing

Debugging: - Inspect exact prompts sent to LLM - See all tool invocations with results - Track execution timing per operation - Access full error context with tracebacks

Consistency: - Same response format for Agent.run(), Agent.think(), and A2A - Predictable error handling across all agent operations - Unified metadata structure

Integration: - Seamless conversion to Events for orchestration - Works with all executor types (Native, ReAct, ChainOfThought)


Exports

from cogent.core import (
    # Enums
    TaskStatus,
    AgentStatus,
    EventType,
    Priority,
    AgentRole,
    # Context
    RunContext,
    EMPTY_CONTEXT,
    # Utilities
    generate_id,
    now_utc,
    now_local,
    to_local,
    format_timestamp,
    # Reactive utilities
    IdempotencyGuard,
    RetryBudget,
    emit_later,
    jittered_delay,
    Stopwatch,
)

from cogent.core.messages import (
    BaseMessage,
    SystemMessage,
    HumanMessage,
    AIMessage,
    ToolMessage,
    messages_to_dict,
    parse_openai_response,
)

from cogent.core.response import (
    Response,
    ResponseMetadata,
    TokenUsage,
    ToolCall,
    ErrorInfo,
    ResponseError,
)