orka.agents.base_agent module

Agent Base Classes

This module defines the foundation for all OrKa agents - the processing units that transform inputs into outputs within orchestrated workflows.

Agent Architecture

OrKa provides two agent patterns to support different implementation needs:

Modern Async Pattern (BaseAgent) - Full async/await support for concurrent execution - Structured output handling with automatic error wrapping - Built-in timeout and concurrency control - Lifecycle hooks for initialization and cleanup - Context-aware execution with trace information

Legacy Sync Pattern (LegacyBaseAgent) - Simple synchronous execution model - Compatible with existing agent implementations - Direct result return without output wrapping - Backward compatibility for older agents

Core Concepts

Agent Lifecycle: 1. Initialization: Set up resources and validate configuration 2. Execution: Process inputs with context awareness 3. Result Handling: Structure outputs for downstream processing 4. Cleanup: Release resources and maintain system health

Context Management: - Agents receive context dictionaries containing input data and metadata - Trace IDs are automatically added for debugging and monitoring - Previous outputs from other agents are available in the context - Error information is captured and structured for debugging

Concurrency Control: - Built-in concurrency manager limits parallel executions - Configurable timeout handling prevents hanging operations - Thread-safe execution for multi-agent workflows - Resource pooling for efficient memory usage

Implementation Patterns

Modern Agent Example: ```python from orka.agents.base_agent import BaseAgent

class MyModernAgent(BaseAgent):
async def _run_impl(self, ctx):

input_data = ctx.get(“input”) # Process input asynchronously result = await self.process_async(input_data) return result

```

Legacy Agent Example: ```python from orka.agents.base_agent import LegacyBaseAgent

class MyLegacyAgent(LegacyBaseAgent):
def run(self, input_data):

# Simple synchronous processing return self.process_sync(input_data)

```

Error Handling

Modern Agents: - Exceptions are automatically caught and wrapped in Output objects - Error details are preserved for debugging - Status indicators show success/failure state - Metadata includes agent identification

Legacy Agents: - Exceptions propagate directly to the orchestrator - Simple error handling for backward compatibility - Direct return values without wrapping

Integration Features

Registry Integration: - Agents can access shared resources through the registry - Dependency injection for memory, embedders, and other services - Lazy initialization of expensive resources

Orchestrator Integration: - Agents are automatically discovered and instantiated - Configuration is passed through constructor parameters - Results flow seamlessly between agents in workflows

Monitoring and Debugging: - Automatic trace ID generation for request tracking - Execution timing and performance metrics - Comprehensive logging for troubleshooting

class orka.agents.base_agent.BaseAgent(agent_id: str, registry: Registry | None = None, prompt: str | None = None, queue: list[str] | None = None, timeout: float | None = 30.0, max_concurrency: int = 10, **kwargs)[source]

Bases: object

🧠 Foundation for all OrKa agents - the cognitive building blocks of your workflows.

What makes agents special: - Intelligent Processing: Transform raw inputs into structured, meaningful outputs - Context Preservation: Maintain conversation flow and processing history - Async Performance: Non-blocking execution with timeout and concurrency control - Error Intelligence: Graceful failure handling with detailed diagnostics - Resource Management: Automatic initialization and cleanup of dependencies

Agent Lifecycle: 1. Initialization: Set up resources and validate configuration 2. Execution: Process inputs with full context awareness 3. Result Handling: Structure outputs for downstream processing 4. Cleanup: Release resources and maintain system health

Perfect for building: - Multi-step reasoning workflows - Intelligent content processing pipelines - Context-aware conversational systems - Fault-tolerant distributed AI applications

Example Usage: ```python # Create a specialized agent classifier = OpenAIClassificationAgent(

agent_id=”intent_classifier”, options=[“question”, “request”, “complaint”], prompt=”Classify user intent: {{ input }}”

)

# Execute with context result = await classifier.run({

“input”: “How do I reset my password?”, “user_id”: “user123”, “session_context”: previous_interactions

})

__init__(agent_id: str, registry: Registry | None = None, prompt: str | None = None, queue: list[str] | None = None, timeout: float | None = 30.0, max_concurrency: int = 10, **kwargs)[source]

Initialize the base agent with common properties.

Parameters:
  • agent_id (str) – Unique identifier for the agent

  • registry (Registry, optional) – Resource registry for dependency injection

  • prompt (str, optional) – Prompt or instruction for the agent (legacy)

  • queue (List[str], optional) – Queue of agents or nodes (legacy)

  • timeout (Optional[float]) – Maximum execution time in seconds

  • max_concurrency (int) – Maximum number of concurrent executions

  • **kwargs – Additional parameters specific to the agent type

async initialize() None[source]

Initialize the agent and its resources.

This method is called automatically before the first execution and should be overridden by derived classes to set up any required resources.

async run(ctx: Context | Any) Output | Any[source]

Run the agent with the given context.

This method handles the execution workflow including: - Lazy initialization of the agent - Adding trace information to the context - Managing concurrency and timeouts - Standardizing error handling and result formatting

Parameters:

ctx – The execution context containing input and metadata. Can be a Context object for modern agents or any input for legacy agents.

Returns:

Standardized output for modern agents or direct result for legacy agents

Return type:

Output or Any

async cleanup() None[source]

Clean up agent resources.

This method should be called when the agent is no longer needed to release any resources it may be holding, such as network connections, file handles, or memory.

__repr__()[source]

Return a string representation of the agent.

Returns:

String representation showing agent class, ID, and queue.

Return type:

str

class orka.agents.base_agent.LegacyBaseAgent(agent_id, prompt, queue, **kwargs)[source]

Bases: ABC, BaseAgent

Abstract base class for legacy agents in the OrKa framework. Provides compatibility with the older synchronous agent pattern.

New agent implementations should use BaseAgent directly with async methods. This class exists only for backward compatibility.

__init__(agent_id, prompt, queue, **kwargs)[source]

Initialize the legacy base agent.

Parameters:
  • agent_id (str) – Unique identifier for the agent.

  • prompt (str) – Prompt or instruction for the agent.

  • queue (list) – Queue of agents or nodes to be processed.

  • **kwargs – Additional parameters specific to the agent type.

abstractmethod run(input_data)[source]

Abstract method to run the agent’s reasoning process. Must be implemented by all concrete agent classes.

Parameters:

input_data – Input data for the agent to process.

Returns:

The result of the agent’s processing.

Raises:

NotImplementedError – If not implemented by a subclass.