orka.agents.base_agent module
Agent Base Classes
This module defines the foundation for all OrKa agents - the processing units that transform inputs into outputs within orchestrated workflows.
Agent Architecture
OrKa provides two agent patterns to support different implementation needs:
Modern Async Pattern (BaseAgent) - Full async/await support for concurrent execution - Structured output handling with automatic error wrapping - Built-in timeout and concurrency control - Lifecycle hooks for initialization and cleanup - Context-aware execution with trace information
Legacy Sync Pattern (LegacyBaseAgent) - Simple synchronous execution model - Compatible with existing agent implementations - Direct result return without output wrapping - Backward compatibility for older agents
Core Concepts
Agent Lifecycle: 1. Initialization: Set up resources and validate configuration 2. Execution: Process inputs with context awareness 3. Result Handling: Structure outputs for downstream processing 4. Cleanup: Release resources and maintain system health
Context Management: - Agents receive context dictionaries containing input data and metadata - Trace IDs are automatically added for debugging and monitoring - Previous outputs from other agents are available in the context - Error information is captured and structured for debugging
Concurrency Control: - Built-in concurrency manager limits parallel executions - Configurable timeout handling prevents hanging operations - Thread-safe execution for multi-agent workflows - Resource pooling for efficient memory usage
Implementation Patterns
Modern Agent Example: ```python from orka.agents.base_agent import BaseAgent
- class MyModernAgent(BaseAgent):
- async def _run_impl(self, ctx):
input_data = ctx.get(“input”) # Process input asynchronously result = await self.process_async(input_data) return result
Legacy Agent Example: ```python from orka.agents.base_agent import LegacyBaseAgent
- class MyLegacyAgent(LegacyBaseAgent):
- def run(self, input_data):
# Simple synchronous processing return self.process_sync(input_data)
Error Handling
Modern Agents: - Exceptions are automatically caught and wrapped in Output objects - Error details are preserved for debugging - Status indicators show success/failure state - Metadata includes agent identification
Legacy Agents: - Exceptions propagate directly to the orchestrator - Simple error handling for backward compatibility - Direct return values without wrapping
Integration Features
Registry Integration: - Agents can access shared resources through the registry - Dependency injection for memory, embedders, and other services - Lazy initialization of expensive resources
Orchestrator Integration: - Agents are automatically discovered and instantiated - Configuration is passed through constructor parameters - Results flow seamlessly between agents in workflows
Monitoring and Debugging: - Automatic trace ID generation for request tracking - Execution timing and performance metrics - Comprehensive logging for troubleshooting
- class orka.agents.base_agent.BaseAgent(agent_id: str, registry: Registry | None = None, prompt: str | None = None, queue: list[str] | None = None, timeout: float | None = 30.0, max_concurrency: int = 10, **kwargs)[source]
Bases:
object
🧠 Foundation for all OrKa agents - the cognitive building blocks of your workflows.
What makes agents special: - Intelligent Processing: Transform raw inputs into structured, meaningful outputs - Context Preservation: Maintain conversation flow and processing history - Async Performance: Non-blocking execution with timeout and concurrency control - Error Intelligence: Graceful failure handling with detailed diagnostics - Resource Management: Automatic initialization and cleanup of dependencies
Agent Lifecycle: 1. Initialization: Set up resources and validate configuration 2. Execution: Process inputs with full context awareness 3. Result Handling: Structure outputs for downstream processing 4. Cleanup: Release resources and maintain system health
Perfect for building: - Multi-step reasoning workflows - Intelligent content processing pipelines - Context-aware conversational systems - Fault-tolerant distributed AI applications
Example Usage: ```python # Create a specialized agent classifier = OpenAIClassificationAgent(
agent_id=”intent_classifier”, options=[“question”, “request”, “complaint”], prompt=”Classify user intent: {{ input }}”
)
# Execute with context result = await classifier.run({
“input”: “How do I reset my password?”, “user_id”: “user123”, “session_context”: previous_interactions
})
- __init__(agent_id: str, registry: Registry | None = None, prompt: str | None = None, queue: list[str] | None = None, timeout: float | None = 30.0, max_concurrency: int = 10, **kwargs)[source]
Initialize the base agent with common properties.
- Parameters:
agent_id (str) – Unique identifier for the agent
registry (Registry, optional) – Resource registry for dependency injection
prompt (str, optional) – Prompt or instruction for the agent (legacy)
queue (List[str], optional) – Queue of agents or nodes (legacy)
timeout (Optional[float]) – Maximum execution time in seconds
max_concurrency (int) – Maximum number of concurrent executions
**kwargs – Additional parameters specific to the agent type
- async initialize() None [source]
Initialize the agent and its resources.
This method is called automatically before the first execution and should be overridden by derived classes to set up any required resources.
- async run(ctx: Context | Any) Output | Any [source]
Run the agent with the given context.
This method handles the execution workflow including: - Lazy initialization of the agent - Adding trace information to the context - Managing concurrency and timeouts - Standardizing error handling and result formatting
- Parameters:
ctx – The execution context containing input and metadata. Can be a Context object for modern agents or any input for legacy agents.
- Returns:
Standardized output for modern agents or direct result for legacy agents
- Return type:
Output or Any
- class orka.agents.base_agent.LegacyBaseAgent(agent_id, prompt, queue, **kwargs)[source]
Bases:
ABC
,BaseAgent
Abstract base class for legacy agents in the OrKa framework. Provides compatibility with the older synchronous agent pattern.
New agent implementations should use BaseAgent directly with async methods. This class exists only for backward compatibility.
- __init__(agent_id, prompt, queue, **kwargs)[source]
Initialize the legacy base agent.
- Parameters:
agent_id (str) – Unique identifier for the agent.
prompt (str) – Prompt or instruction for the agent.
queue (list) – Queue of agents or nodes to be processed.
**kwargs – Additional parameters specific to the agent type.
- abstractmethod run(input_data)[source]
Abstract method to run the agent’s reasoning process. Must be implemented by all concrete agent classes.
- Parameters:
input_data – Input data for the agent to process.
- Returns:
The result of the agent’s processing.
- Raises:
NotImplementedError – If not implemented by a subclass.