Skip to content

RunContext Module

The cogent.context module provides invocation-scoped context for dependency injection into tools and interceptors.

Overview

RunContext enables passing typed data to tools and interceptors at invocation time without global state. Context automatically includes the original user query for task lineage tracking.

from dataclasses import dataclass
from cogent import Agent, RunContext, tool

@dataclass
class AppContext(RunContext):
    user_id: str
    db: Database
    api_key: str

@tool
def get_user_data(ctx: RunContext) -> str:
    """Get data for the current user."""
    user = ctx.db.get_user(ctx.user_id)
    # Access original query
    print(f"Original query: {ctx.query}")
    return f"User: {user.name}"

agent = Agent(name="assistant", model=model, tools=[get_user_data])

result = await agent.run(
    "Get my profile data",
    context=AppContext(user_id="123", db=db, api_key=key),
)

Built-in Context Fields

The base RunContext provides framework-managed fields:

Field Type Description
query str Original user query that initiated execution. Auto-populated on first agent.run().
metadata dict Extension dict for untyped data.
agent Agent \| None Internal reference to executing agent.

Query Field

The query field tracks the original user request through agent delegations:

# Top-level agent
result = await orchestrator.run("Can I delete files?", context=ctx)
# ctx.query = "Can I delete files?"

# Delegated sub-agent (via subagents=)
# Still has: ctx.query = "Can I delete files?"
# But receives different task parameter: "check user permissions"

This enables sub-agents to understand the broader context while working on their specific subtask.


Creating Custom Contexts

Basic Context

from dataclasses import dataclass
from cogent import RunContext

@dataclass
class MyContext(RunContext):
    user_id: str
    session_id: str
    permissions: list[str] = field(default_factory=list)

    def has_permission(self, perm: str) -> bool:
        return perm in self.permissions

With Services

@dataclass
class ServiceContext(RunContext):
    db: Database
    cache: Redis
    api_client: APIClient
    logger: Logger

    async def get_user(self, user_id: str) -> User:
        """Get user with caching."""
        cached = await self.cache.get(f"user:{user_id}")
        if cached:
            return User.from_json(cached)
        user = await self.db.get_user(user_id)
        await self.cache.set(f"user:{user_id}", user.to_json())
        return user

Using Context in Tools

Accessing Context

from cogent import tool, RunContext

@tool
def get_user_orders(ctx: RunContext) -> str:
    """Get orders for the current user."""
    # Access context properties
    user_id = ctx.user_id
    orders = ctx.db.get_orders(user_id)
    return f"Found {len(orders)} orders"

@tool
def admin_action(action: str, ctx: RunContext) -> str:
    """Perform admin action (requires admin permission)."""
    if not ctx.has_permission("admin"):
        return "Permission denied"
    return f"Performed: {action}"

Context with Other Parameters

@tool
def search_with_context(
    query: str,
    limit: int = 10,
    ctx: RunContext = None,  # Context is optional
) -> str:
    """Search with user context."""
    if ctx and ctx.user_id:
        # Personalized search
        return personalized_search(query, ctx.user_id, limit)
    return generic_search(query, limit)

Using Context in Interceptors

from cogent.interceptors import Interceptor, InterceptContext, InterceptResult

class PermissionInterceptor(Interceptor):
    async def intercept(
        self,
        phase: Phase,
        context: InterceptContext,
    ) -> InterceptResult:
        run_ctx = context.run_context

        # Check permissions
        if not run_ctx.has_permission("use_agent"):
            return InterceptResult.stop("Permission denied")

        return InterceptResult.continue_()

Context Metadata

The base RunContext includes a metadata dict for extension:

from cogent import RunContext

ctx = RunContext(metadata={
    "request_id": "req-123",
    "correlation_id": "corr-456",
    "trace_id": "trace-789",
})

# Access metadata
request_id = ctx.get("request_id")
request_id = ctx.metadata.get("request_id")

# Create new context with additional metadata
new_ctx = ctx.with_metadata(
    timestamp=datetime.now(),
    version="1.0",
)

Subagent Context Propagation

Context flows automatically to delegated subagents:

from cogent import Agent, tool
from dataclasses import dataclass

@dataclass
class UserContext(RunContext):
    user_id: str
    permissions: set[str]

@tool
def check_permissions(ctx: RunContext) -> str:
    """Check if user has required permissions."""
    print(f"Original query: {ctx.query}")
    if "admin" in ctx.permissions:
        return "Permission granted"
    return "Permission denied"

# Specialist agent receives context automatically
specialist = Agent(
    name="permission_checker",
    model=model,
    tools=[check_permissions],
)

# Orchestrator delegates to specialist via subagents=
orchestrator = Agent(
    name="orchestrator",
    model=model,
    subagents=[specialist],  # Context flows automatically
)

# Context passes through entire delegation chain
result = await orchestrator.run(
    "Can I delete files?",
    context=UserContext(user_id="123", permissions={"read"}),
)

Passing Context to Agents

from cogent import Agent

agent = Agent(name="assistant", model=model, tools=[...])

# Pass context at run time
result = await agent.run(
    "Perform action",
    context=MyContext(
        user_id="user-123",
        session_id="sess-456",
        permissions=["read", "write"],
    ),
)

Context Immutability

Context should be treated as immutable during execution:

@dataclass(frozen=True)  # Enforce immutability
class ImmutableContext(RunContext):
    user_id: str
    tenant_id: str

# To "modify", create a new instance
new_ctx = ctx.with_metadata(extra="value")

Request-Scoped Context

Common pattern for web applications:

from fastapi import FastAPI, Depends
from cogent import Agent, RunContext

app = FastAPI()

@dataclass
class RequestContext(RunContext):
    user_id: str
    tenant_id: str
    trace_id: str
    db: AsyncSession

async def get_context(
    request: Request,
    db: AsyncSession = Depends(get_db),
) -> RequestContext:
    return RequestContext(
        user_id=request.state.user_id,
        tenant_id=request.state.tenant_id,
        trace_id=request.headers.get("X-Trace-ID"),
        db=db,
    )

@app.post("/chat")
async def chat(
    message: str,
    context: RequestContext = Depends(get_context),
):
    agent = get_agent()
    result = await agent.run(message, context=context)
    return {"response": result.output}

Multi-Tenant Context

@dataclass
class TenantContext(RunContext):
    tenant_id: str
    tenant_config: dict
    allowed_tools: list[str]
    rate_limit: int

    def can_use_tool(self, tool_name: str) -> bool:
        return tool_name in self.allowed_tools

# Use in agent
result = await agent.run(
    "Query",
    context=TenantContext(
        tenant_id="acme-corp",
        tenant_config={"max_tokens": 4000},
        allowed_tools=["search", "calculate"],
        rate_limit=100,
    ),
)

Testing with Context

import pytest
from unittest.mock import Mock

@pytest.fixture
def test_context():
    return MyContext(
        user_id="test-user",
        db=Mock(),
        cache=Mock(),
    )

async def test_tool_with_context(test_context):
    result = await get_user_orders.__wrapped__(ctx=test_context)
    assert "orders" in result

Common Patterns

Tracking Delegation Depth

Prevent infinite delegation loops and adapt behavior based on depth:

from dataclasses import dataclass, field
from cogent import Agent, RunContext, tool

@dataclass
class DelegationContext(RunContext):
    depth: int = 0
    max_depth: int = 3

@tool
def process_task(ctx: DelegationContext) -> str:
    """Process a task with depth awareness."""
    if ctx.depth >= ctx.max_depth:
        return "Maximum delegation depth reached - handling directly"

    # Different strategy based on depth
    if ctx.depth == 0:
        return "Top-level processing"
    else:
        return f"Sub-task processing at depth {ctx.depth}"

# When delegating, increment depth
specialist = Agent(name="specialist", model=model, tools=[process_task])

# In orchestrator, increment depth when delegating
@tool
def delegate_task(task: str, ctx: DelegationContext) -> str:
    """Delegate to specialist with incremented depth."""
    new_ctx = DelegationContext(
        query=ctx.query,
        depth=ctx.depth + 1,
        max_depth=ctx.max_depth,
        metadata=ctx.metadata,
    )
    return specialist.run(task, context=new_ctx).output

Retry Tracking

Track retry attempts to implement fallback strategies:

@dataclass
class RetryContext(RunContext):
    retry_count: int = 0
    max_retries: int = 3
    is_retry: bool = False

@tool
def fetch_data(url: str, ctx: RetryContext) -> str:
    """Fetch data with retry awareness."""
    if ctx.is_retry:
        # Use more conservative approach on retry
        timeout = 10 + (ctx.retry_count * 5)  # Increase timeout
        print(f"Retry #{ctx.retry_count} with timeout={timeout}s")
    else:
        timeout = 5

    # Fetch logic...
    return f"Data from {url}"

# On retry, create new context
if failed and retry_count < max_retries:
    retry_ctx = RetryContext(
        query=ctx.query,
        retry_count=retry_count + 1,
        is_retry=True,
        metadata=ctx.metadata,
    )
    result = agent.run(task, context=retry_ctx)

Task Lineage Tracking

Track parent-child task relationships:

from cogent.core import generate_id

@dataclass
class TaskContext(RunContext):
    task_id: str = field(default_factory=generate_id)
    parent_task_id: str | None = None
    task_chain: list[str] = field(default_factory=list)

    @property
    def is_root_task(self) -> bool:
        return self.parent_task_id is None

    @property
    def depth(self) -> int:
        return len(self.task_chain)

@tool
def analyze_lineage(ctx: TaskContext) -> str:
    """Show task lineage information."""
    if ctx.is_root_task:
        return f"Root task: {ctx.task_id}"
    else:
        chain = " → ".join(ctx.task_chain + [ctx.task_id])
        return f"Task chain ({ctx.depth} deep): {chain}"

# When delegating, build chain
def delegate_with_lineage(task: str, ctx: TaskContext) -> str:
    child_ctx = TaskContext(
        query=ctx.query,
        task_id=generate_id(),
        parent_task_id=ctx.task_id,
        task_chain=ctx.task_chain + [ctx.task_id],
        metadata=ctx.metadata,
    )
    return specialist.run(task, context=child_ctx).output

Execution Timing

Track execution duration and deadlines:

from datetime import datetime, timedelta
from cogent.core import now_utc

@dataclass
class TimedContext(RunContext):
    started_at: datetime = field(default_factory=now_utc)
    deadline: datetime | None = None

    @property
    def elapsed_seconds(self) -> float:
        return (now_utc() - self.started_at).total_seconds()

    @property
    def is_overdue(self) -> bool:
        if self.deadline is None:
            return False
        return now_utc() > self.deadline

    @property
    def remaining_seconds(self) -> float | None:
        if self.deadline is None:
            return None
        return (self.deadline - now_utc()).total_seconds()

@tool
def long_running_task(ctx: TimedContext) -> str:
    """Task that respects deadlines."""
    if ctx.is_overdue:
        return "Task deadline exceeded - aborting"

    remaining = ctx.remaining_seconds
    if remaining and remaining < 5:
        return "Running in fast mode - deadline approaching"

    return f"Normal execution (elapsed: {ctx.elapsed_seconds:.1f}s)"

# Create context with deadline
ctx = TimedContext(
    deadline=now_utc() + timedelta(seconds=30)
)
result = agent.run("Perform task", context=ctx)

Combining Patterns

Compose multiple patterns for comprehensive tracking:

@dataclass
class ExecutionContext(RunContext):
    """Rich execution context combining common patterns."""

    # Task identity
    task_id: str = field(default_factory=generate_id)
    parent_task_id: str | None = None

    # Delegation tracking
    depth: int = 0
    max_depth: int = 5

    # Retry tracking
    retry_count: int = 0
    is_retry: bool = False

    # Timing
    started_at: datetime = field(default_factory=now_utc)
    deadline: datetime | None = None

    # Helper methods
    @property
    def can_delegate(self) -> bool:
        return self.depth < self.max_depth

    @property
    def elapsed_seconds(self) -> float:
        return (now_utc() - self.started_at).total_seconds()

    def create_child_context(self) -> "ExecutionContext":
        """Create context for delegated task."""
        return ExecutionContext(
            query=self.query,
            task_id=generate_id(),
            parent_task_id=self.task_id,
            depth=self.depth + 1,
            max_depth=self.max_depth,
            metadata=self.metadata,
            started_at=now_utc(),
            deadline=self.deadline,  # Propagate deadline
        )

@tool
def smart_task(ctx: ExecutionContext) -> str:
    """Task that uses comprehensive context."""
    if not ctx.can_delegate:
        return "Max delegation depth reached"

    if ctx.is_retry:
        return f"Retry attempt #{ctx.retry_count}"

    return f"Task {ctx.task_id} at depth {ctx.depth} (elapsed: {ctx.elapsed_seconds:.1f}s)"

API Reference

RunContext

@dataclass
class RunContext:
    query: str = field(default="", repr=False)
    metadata: dict[str, Any] = field(default_factory=dict)

    def get(self, key: str, default: Any = None) -> Any:
        """Get metadata value by key."""

    def with_metadata(self, **kwargs) -> RunContext:
        """Create new context with additional metadata."""

Usage Patterns

Pattern Description
Tool injection def my_tool(ctx: RunContext)
Optional context def my_tool(arg: str, ctx: RunContext = None)
Interceptor access context.run_context
Metadata access ctx.get("key") or ctx.metadata["key"]
Extend context ctx.with_metadata(key="value")
Subclass context class MyContext(RunContext)