Skip to content

Interceptors Module

The cogent.interceptors module provides composable units that intercept agent execution for cross-cutting concerns like cost control, security, context management, and observability.

Overview

Interceptors are middleware that wrap agent execution at specific phases: - Before LLM call - Modify context, filter tools, check budgets - After LLM call - Validate responses, mask PII, audit - Before tool call - Gate access, rate limit, retry logic - After tool call - Post-process results, aggregate data

from cogent import Agent
from cogent.interceptors import BudgetGuard, PIIShield

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        BudgetGuard(max_model_calls=10, max_tool_calls=50),
        PIIShield(patterns=["email", "ssn"]),
    ],
)

Core Concepts

Interceptor Lifecycle

Interceptors run at specific phases:

User Query
[BEFORE_LLM] ← Context modification, validation
LLM Call
[AFTER_LLM] ← Response validation, PII masking
[BEFORE_TOOL] ← Tool gating, rate limiting
Tool Execution
[AFTER_TOOL] ← Result post-processing
Response to User

Phase Enum

from cogent.interceptors import Phase

Phase.BEFORE_LLM    # Before sending to LLM
Phase.AFTER_LLM     # After receiving LLM response
Phase.BEFORE_TOOL   # Before tool execution
Phase.AFTER_TOOL    # After tool execution

InterceptResult

Interceptors return a result that can: - Continue - Proceed to next interceptor/phase - Modify - Change the data and continue - Stop - Halt execution with a response

from cogent.interceptors import InterceptResult

# Continue unchanged
return InterceptResult.continue_()

# Modify and continue
return InterceptResult.modify(new_messages=modified_messages)

# Stop execution
return InterceptResult.stop(response="Cannot proceed: budget exceeded")

Built-in Interceptors

BudgetGuard

Control costs by limiting LLM and tool calls:

from cogent.interceptors import BudgetGuard

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        BudgetGuard(
            max_model_calls=10,     # Max LLM invocations
            max_tool_calls=50,      # Max tool executions
            max_tokens=100000,      # Max token usage
            on_exceeded="stop",     # "stop" or "warn"
        ),
    ],
)

# Check budget status
guard = agent.interceptors[0]
print(f"Calls: {guard.model_calls}/{guard.max_model_calls}")
print(f"Tokens: {guard.tokens_used}/{guard.max_tokens}")

PIIShield

Detect and handle PII in inputs/outputs:

from cogent.interceptors import PIIShield, PIIAction

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        PIIShield(
            patterns=["email", "phone", "ssn", "credit_card"],
            action=PIIAction.MASK,  # MASK, REDACT, or BLOCK
        ),
    ],
)

# Input: "Contact john@email.com"
# Masked: "Contact [EMAIL]"

Actions:

Action Behavior
PIIAction.MASK Replace with [TYPE] placeholder
PIIAction.REDACT Remove entirely
PIIAction.BLOCK Stop execution with error

ContentFilter

Filter harmful or inappropriate content:

from cogent.interceptors import ContentFilter

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        ContentFilter(
            block_patterns=["password", "secret key"],
            allow_patterns=["public api"],  # Whitelist
        ),
    ],
)

TokenLimiter

Limit context size to fit model constraints:

from cogent.interceptors import TokenLimiter

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        TokenLimiter(
            max_tokens=8000,        # Max context tokens
            strategy="truncate",   # "truncate" or "summarize"
            keep_system=True,      # Always keep system message
            keep_last_n=5,         # Keep last N messages
        ),
    ],
)

ContextCompressor

Compress context to reduce token usage:

from cogent.interceptors import ContextCompressor

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        ContextCompressor(
            model=model,           # LLM for summarization
            trigger_tokens=6000,   # Compress when above this
            target_tokens=3000,    # Target after compression
        ),
    ],
)

Tool Control

ToolGate

Control which tools are available:

from cogent.interceptors import ToolGate

agent = Agent(
    name="assistant",
    model=model,
    tools=[search, write_file, delete_file],
    intercept=[
        ToolGate(
            allow=["search"],           # Only these tools
            # Or: deny=["delete_file"], # Block these tools
        ),
    ],
)

Dynamic gating:

def gate_by_user(ctx: InterceptContext) -> list[str]:
    if ctx.run_context.get("is_admin"):
        return ["*"]  # All tools
    return ["search", "read_file"]

agent = Agent(
    intercept=[ToolGate(allow=gate_by_user)],
)

PermissionGate

Role-based tool permissions:

from cogent.interceptors import PermissionGate

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        PermissionGate(
            permissions={
                "admin": ["*"],
                "user": ["search", "read"],
                "guest": ["search"],
            },
            get_role=lambda ctx: ctx.run_context.get("role", "guest"),
        ),
    ],
)

ConversationGate

Enable tools based on conversation state:

from cogent.interceptors import ConversationGate

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        ConversationGate(
            # Unlock tools after specific messages
            unlock_on={
                "confirmed": ["execute_order"],
                "authenticated": ["view_account", "transfer"],
            },
        ),
    ],
)

Resilience

RateLimiter

Limit request rates:

from cogent.interceptors import RateLimiter

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        RateLimiter(
            max_requests=10,       # Max requests
            window_seconds=60,     # Per time window
            on_exceeded="wait",    # "wait" or "error"
        ),
    ],
)

Failover

Automatic model failover:

from cogent.interceptors import Failover, FailoverTrigger
from cogent.models import ChatModel

agent = Agent(
    name="assistant",
    model=primary_model,
    intercept=[
        Failover(
            fallback_models=[
                ChatModel(model="gpt-5.4-mini"),
                ChatModel(model="gpt-3.5-turbo"),
            ],
            triggers=[
                FailoverTrigger.RATE_LIMIT,
                FailoverTrigger.TIMEOUT,
                FailoverTrigger.ERROR,
            ],
            max_retries=2,
        ),
    ],
)

CircuitBreaker

Prevent cascade failures:

from cogent.interceptors import CircuitBreaker

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        CircuitBreaker(
            failure_threshold=5,   # Failures before opening
            recovery_timeout=60,   # Seconds before retry
            half_open_requests=2,  # Test requests when recovering
        ),
    ],
)

ToolGuard

Per-tool retry and circuit breaker:

from cogent.interceptors import ToolGuard

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        ToolGuard(
            tool_configs={
                "search": {
                    "max_retries": 3,
                    "backoff": "exponential",
                    "circuit_breaker": True,
                },
                "database": {
                    "max_retries": 1,
                    "timeout": 30,
                },
            },
        ),
    ],
)

Auditing

Auditor

Log all agent activity:

from cogent.interceptors import Auditor, AuditEventType

async def log_event(event):
    print(f"[{event.type}] {event.agent}: {event.data}")

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        Auditor(
            handler=log_event,
            events=[
                AuditEventType.LLM_REQUEST,
                AuditEventType.LLM_RESPONSE,
                AuditEventType.TOOL_CALL,
                AuditEventType.TOOL_RESULT,
            ],
            include_content=True,  # Log message content
        ),
    ],
)

Prompt Adapters

ContextPrompt

Inject dynamic context into system prompt:

from cogent.interceptors import ContextPrompt

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        ContextPrompt(
            template="""Current time: {time}
User timezone: {timezone}
User preferences: {preferences}""",
            get_context=lambda ctx: {
                "time": datetime.now().isoformat(),
                "timezone": ctx.run_context.get("timezone", "UTC"),
                "preferences": ctx.run_context.get("preferences", {}),
            },
        ),
    ],
)

ConversationPrompt

Add conversation-aware context:

from cogent.interceptors import ConversationPrompt

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        ConversationPrompt(
            summary_threshold=20,  # Summarize after N messages
            include_summary=True,
            model=model,
        ),
    ],
)

LambdaPrompt

Custom prompt modification:

from cogent.interceptors import LambdaPrompt

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        LambdaPrompt(
            modifier=lambda messages, ctx: [
                {**m, "content": m["content"].upper()}
                if m["role"] == "user" else m
                for m in messages
            ],
        ),
    ],
)

Custom Interceptors

Basic Structure

from cogent.interceptors import Interceptor, Phase, InterceptContext, InterceptResult

class MyInterceptor(Interceptor):
    """Custom interceptor example."""

    phases = [Phase.BEFORE_LLM, Phase.AFTER_LLM]

    async def intercept(
        self,
        phase: Phase,
        context: InterceptContext,
    ) -> InterceptResult:
        if phase == Phase.BEFORE_LLM:
            # Modify messages before LLM
            messages = context.messages
            messages.append({"role": "system", "content": "Be concise."})
            return InterceptResult.modify(new_messages=messages)

        elif phase == Phase.AFTER_LLM:
            # Log response
            print(f"Response: {context.response.content}")
            return InterceptResult.continue_()

InterceptContext

Available context in interceptors:

@dataclass
class InterceptContext:
    agent: Agent                  # Current agent
    phase: Phase                  # Current phase
    messages: list[dict]          # Current messages
    response: AIMessage | None    # LLM response (after phases)
    tool_call: dict | None        # Tool call info (tool phases)
    tool_result: Any | None       # Tool result (AFTER_TOOL)
    run_context: RunContext       # User-provided context
    metadata: dict                # Additional data

Stateful Interceptors

class ConversationTracker(Interceptor):
    """Track conversation statistics."""

    phases = [Phase.AFTER_LLM]

    def __init__(self):
        self.message_count = 0
        self.total_tokens = 0

    async def intercept(
        self,
        phase: Phase,
        context: InterceptContext,
    ) -> InterceptResult:
        self.message_count += 1
        if context.response and context.response.usage:
            self.total_tokens += context.response.usage.get("total_tokens", 0)
        return InterceptResult.continue_()

    def stats(self) -> dict:
        return {
            "messages": self.message_count,
            "tokens": self.total_tokens,
        }

Combining Interceptors

Interceptors execute in order. Use StopExecution to halt the chain:

from cogent.interceptors import StopExecution

agent = Agent(
    name="assistant",
    model=model,
    intercept=[
        # Order matters - these run sequentially
        PIIShield(patterns=["ssn"]),       # First: mask PII
        BudgetGuard(max_model_calls=10),   # Second: check budget
        ToolGate(allow=["search"]),        # Third: filter tools
        Auditor(handler=log),              # Last: audit all activity
    ],
)

# If BudgetGuard exceeds limit, it raises StopExecution
# and Auditor never runs for that call

API Reference

Core Classes

Class Description
Interceptor Base class for all interceptors
InterceptContext Context passed to interceptors
InterceptResult Return type from intercept method
Phase Enum of interception phases
StopExecution Exception to halt execution

Built-in Interceptors

Category Interceptors
Budget BudgetGuard
Security PIIShield, ContentFilter
Context TokenLimiter, ContextCompressor
Gates ToolGate, PermissionGate, ConversationGate
Rate Limit RateLimiter, ThrottleInterceptor
Resilience Failover, CircuitBreaker, ToolGuard
Audit Auditor
Prompts ContextPrompt, ConversationPrompt, LambdaPrompt