orka.orchestrator.execution_engine module

Execution Engine

The ExecutionEngine is the core component responsible for coordinating and executing multi-agent workflows within the OrKa orchestration framework.

Core Responsibilities

Agent Coordination: - Sequential execution of agents based on configuration - Context propagation between agents with previous outputs - Dynamic queue management for workflow control - Error handling and retry logic with exponential backoff

Execution Patterns: - Sequential Processing: Default execution pattern where agents run one after another - Parallel Execution: Fork/join patterns for concurrent agent execution - Conditional Branching: Router nodes for dynamic workflow paths - Memory Operations: Integration with memory nodes for data persistence

Error Management: - Comprehensive error tracking and telemetry collection - Automatic retry with configurable maximum attempts - Graceful degradation and fallback strategies - Detailed error reporting and recovery actions

Architecture Details

Execution Flow: 1. Queue Processing: Agents are processed from the configured queue 2. Context Building: Input data and previous outputs are combined into payload 3. Agent Execution: Individual agents are executed with full context 4. Result Processing: Outputs are captured and added to execution history 5. Queue Management: Next agents are determined based on results

Context Management: - Input data is preserved throughout the workflow - Previous outputs from all agents are available to subsequent agents - Execution metadata (timestamps, step indices) is tracked - Error context is maintained for debugging and recovery

Concurrency Handling: - Thread pool executor for parallel agent execution - Fork group management for coordinated parallel operations - Async/await patterns for non-blocking operations - Resource pooling for efficient memory usage

Implementation Features

Agent Execution: - Support for both sync and async agent implementations - Automatic detection of agent execution patterns - Timeout handling with configurable limits - Resource cleanup after agent completion

Memory Integration: - Automatic logging of agent execution events - Memory backend integration for persistent storage - Context preservation across workflow steps - Trace ID propagation for debugging

Error Handling: - Exception capture and structured error reporting - Retry logic with exponential backoff - Error telemetry collection for monitoring - Graceful failure recovery

Performance Optimization: - Efficient context building and propagation - Minimal memory overhead for large workflows - Optimized queue processing algorithms - Resource pooling for external connections

Execution Patterns

Sequential Execution: ```yaml orchestrator:

strategy: sequential agents: [classifier, router, processor, responder]

```

Parallel Execution: ```yaml orchestrator:

strategy: parallel fork_groups:

  • agents: [validator_1, validator_2, validator_3] join_agent: aggregator

```

Conditional Branching: ```yaml agents:

  • id: router type: router conditions:

    • condition: “{{ classification == ‘urgent’ }}” next_agents: [urgent_handler]

    • condition: “{{ classification == ‘normal’ }}” next_agents: [normal_handler]

```

Integration Points

Memory System: - Automatic event logging for all agent executions - Context preservation in memory backend - Trace ID propagation for request tracking - Performance metrics collection

Error Handling: - Structured error reporting with context - Retry mechanisms with configurable policies - Error telemetry for monitoring and alerting - Recovery action recommendations

Monitoring: - Real-time execution metrics - Agent performance tracking - Resource usage monitoring - Error rate and pattern analysis

class orka.orchestrator.execution_engine.ExecutionEngineProtocol(config_path: str)[source]

Bases: OrchestratorBase

Protocol defining required attributes for ExecutionEngine type variable.

agents: Dict[str, Any]
class orka.orchestrator.execution_engine.ExecutionEngine(*args: Any, **kwargs: Any)[source]

Bases: OrchestratorBase, PromptRenderer, ErrorHandler, MetricsCollector

ExecutionEngine coordinates complex multi-agent workflows within the OrKa framework.

Core Features: - Agent execution with precise coordination - Rich context flow across workflow steps - Fault tolerance with automatic recovery - Real-time optimization and resource management - Scalable architecture for distributed execution

Execution Patterns:

Sequential Processing: ```yaml orchestrator:

strategy: sequential agents: [classifier, router, processor, responder]

```

Parallel Processing: ```yaml orchestrator:

strategy: parallel agents: [validator_1, validator_2, validator_3]

```

Decision Tree: ```yaml orchestrator:

strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator]

```

Advanced Features: - Intelligent retry logic with exponential backoff - Real-time monitoring and performance metrics - Resource management and connection pooling - Production-ready distributed capabilities

Use Cases: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications

async run(input_data: Any, return_logs: bool = False) Any[source]

Execute the orchestrator with the given input data.

Parameters:
  • input_data – The input data for the orchestrator

  • return_logs – If True, return full logs; if False, return final response (default: False)

Returns:

Either the logs array or the final response based on return_logs parameter

async run_parallel_agents(agent_ids: List[str], fork_group_id: str, input_data: Any, previous_outputs: Dict[str, Any]) List[Dict[str, Any]][source]

Enhanced parallel execution with better error handling and logging. Returns a list of log entries for each forked agent.

enqueue_fork(agent_ids: List[str], fork_group_id: str) None[source]

Add agents to the fork queue for processing.