orka.orchestrator.execution_engine module
Execution Engine
The ExecutionEngine is the core component responsible for coordinating and executing multi-agent workflows within the OrKa orchestration framework.
Core Responsibilities
Agent Coordination: - Sequential execution of agents based on configuration - Context propagation between agents with previous outputs - Dynamic queue management for workflow control - Error handling and retry logic with exponential backoff
Execution Patterns: - Sequential Processing: Default execution pattern where agents run one after another - Parallel Execution: Fork/join patterns for concurrent agent execution - Conditional Branching: Router nodes for dynamic workflow paths - Memory Operations: Integration with memory nodes for data persistence
Error Management: - Comprehensive error tracking and telemetry collection - Automatic retry with configurable maximum attempts - Graceful degradation and fallback strategies - Detailed error reporting and recovery actions
Architecture Details
Execution Flow: 1. Queue Processing: Agents are processed from the configured queue 2. Context Building: Input data and previous outputs are combined into payload 3. Agent Execution: Individual agents are executed with full context 4. Result Processing: Outputs are captured and added to execution history 5. Queue Management: Next agents are determined based on results
Context Management: - Input data is preserved throughout the workflow - Previous outputs from all agents are available to subsequent agents - Execution metadata (timestamps, step indices) is tracked - Error context is maintained for debugging and recovery
Concurrency Handling: - Thread pool executor for parallel agent execution - Fork group management for coordinated parallel operations - Async/await patterns for non-blocking operations - Resource pooling for efficient memory usage
Implementation Features
Agent Execution: - Support for both sync and async agent implementations - Automatic detection of agent execution patterns - Timeout handling with configurable limits - Resource cleanup after agent completion
Memory Integration: - Automatic logging of agent execution events - Memory backend integration for persistent storage - Context preservation across workflow steps - Trace ID propagation for debugging
Error Handling: - Exception capture and structured error reporting - Retry logic with exponential backoff - Error telemetry collection for monitoring - Graceful failure recovery
Performance Optimization: - Efficient context building and propagation - Minimal memory overhead for large workflows - Optimized queue processing algorithms - Resource pooling for external connections
Execution Patterns
Sequential Execution: ```yaml orchestrator:
strategy: sequential agents: [classifier, router, processor, responder]
Parallel Execution: ```yaml orchestrator:
strategy: parallel fork_groups:
agents: [validator_1, validator_2, validator_3] join_agent: aggregator
Conditional Branching: ```yaml agents:
id: router type: router conditions:
condition: “{{ classification == ‘urgent’ }}” next_agents: [urgent_handler]
condition: “{{ classification == ‘normal’ }}” next_agents: [normal_handler]
Integration Points
Memory System: - Automatic event logging for all agent executions - Context preservation in memory backend - Trace ID propagation for request tracking - Performance metrics collection
Error Handling: - Structured error reporting with context - Retry mechanisms with configurable policies - Error telemetry for monitoring and alerting - Recovery action recommendations
Monitoring: - Real-time execution metrics - Agent performance tracking - Resource usage monitoring - Error rate and pattern analysis
- class orka.orchestrator.execution_engine.ExecutionEngineProtocol(config_path: str)[source]
Bases:
OrchestratorBaseProtocol defining required attributes for ExecutionEngine type variable.
- agents: Dict[str, Any]
- class orka.orchestrator.execution_engine.ExecutionEngine(*args: Any, **kwargs: Any)[source]
Bases:
OrchestratorBase,PromptRenderer,ErrorHandler,MetricsCollectorExecutionEngine coordinates complex multi-agent workflows within the OrKa framework.
Core Features: - Agent execution with precise coordination - Rich context flow across workflow steps - Fault tolerance with automatic recovery - Real-time optimization and resource management - Scalable architecture for distributed execution
Execution Patterns:
Sequential Processing: ```yaml orchestrator:
strategy: sequential agents: [classifier, router, processor, responder]
Parallel Processing: ```yaml orchestrator:
strategy: parallel agents: [validator_1, validator_2, validator_3]
Decision Tree: ```yaml orchestrator:
strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator]
Advanced Features: - Intelligent retry logic with exponential backoff - Real-time monitoring and performance metrics - Resource management and connection pooling - Production-ready distributed capabilities
Use Cases: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications
- async run(input_data: Any, return_logs: bool = False) Any[source]
Execute the orchestrator with the given input data.
- Parameters:
input_data – The input data for the orchestrator
return_logs – If True, return full logs; if False, return final response (default: False)
- Returns:
Either the logs array or the final response based on return_logs parameter