orka.orchestrator.execution_engine module

Execution Engine

The ExecutionEngine is the core component responsible for coordinating and executing multi-agent workflows within the OrKa orchestration framework.

Core Responsibilities

Agent Coordination: - Sequential execution of agents based on configuration - Context propagation between agents with previous outputs - Dynamic queue management for workflow control - Error handling and retry logic with exponential backoff

Execution Patterns: - Sequential Processing: Default execution pattern where agents run one after another - Parallel Execution: Fork/join patterns for concurrent agent execution - Conditional Branching: Router nodes for dynamic workflow paths - Memory Operations: Integration with memory nodes for data persistence

Error Management: - Comprehensive error tracking and telemetry collection - Automatic retry with configurable maximum attempts - Graceful degradation and fallback strategies - Detailed error reporting and recovery actions

Architecture Details

Execution Flow: 1. Queue Processing: Agents are processed from the configured queue 2. Context Building: Input data and previous outputs are combined into payload 3. Agent Execution: Individual agents are executed with full context 4. Result Processing: Outputs are captured and added to execution history 5. Queue Management: Next agents are determined based on results

Context Management: - Input data is preserved throughout the workflow - Previous outputs from all agents are available to subsequent agents - Execution metadata (timestamps, step indices) is tracked - Error context is maintained for debugging and recovery

Concurrency Handling: - Thread pool executor for parallel agent execution - Fork group management for coordinated parallel operations - Async/await patterns for non-blocking operations - Resource pooling for efficient memory usage

Implementation Features

Agent Execution: - Support for both sync and async agent implementations - Automatic detection of agent execution patterns - Timeout handling with configurable limits - Resource cleanup after agent completion

Memory Integration: - Automatic logging of agent execution events - Memory backend integration for persistent storage - Context preservation across workflow steps - Trace ID propagation for debugging

Error Handling: - Exception capture and structured error reporting - Retry logic with exponential backoff - Error telemetry collection for monitoring - Graceful failure recovery

Performance Optimization: - Efficient context building and propagation - Minimal memory overhead for large workflows - Optimized queue processing algorithms - Resource pooling for external connections

Execution Patterns

Sequential Execution: ```yaml orchestrator:

strategy: sequential agents: [classifier, processor, responder]

```

Parallel Execution: ```yaml orchestrator:

strategy: parallel fork_groups:

  • agents: [validator_1, validator_2, validator_3] join_agent: aggregator

```

Conditional Branching: ```yaml agents:

  • id: router type: router conditions:

    • condition: “{{ classification == ‘urgent’ }}” next_agents: [urgent_handler]

    • condition: “{{ classification == ‘normal’ }}” next_agents: [normal_handler]

```

Integration Points

Memory System: - Automatic event logging for all agent executions - Context preservation in memory backend - Trace ID propagation for request tracking - Performance metrics collection

Error Handling: - Structured error reporting with context - Retry mechanisms with configurable policies - Error telemetry for monitoring and alerting - Recovery action recommendations

Monitoring: - Real-time execution metrics - Agent performance tracking - Resource usage monitoring - Error rate and pattern analysis

class orka.orchestrator.execution_engine.ExecutionEngine[source]

Bases: object

🎼 The conductor of your AI orchestra - coordinates complex multi-agent workflows.

What makes execution intelligent: - Perfect Timing: Orchestrates agent execution with precise coordination - Context Flow: Maintains rich context across all workflow steps - Fault Tolerance: Graceful handling of failures with automatic recovery - Performance Intelligence: Real-time optimization and resource management - Scalable Architecture: From single-threaded to distributed execution

Execution Patterns:

1. Sequential Processing (most common): ```yaml orchestrator:

strategy: sequential agents: [classifier, router, processor, responder]

# Each agent receives full context from all previous steps ```

2. Parallel Processing (for speed): ```yaml orchestrator:

strategy: parallel agents: [validator_1, validator_2, validator_3]

# All agents run simultaneously, results aggregated ```

3. Decision Tree (for complex logic): ```yaml orchestrator:

strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator]

# Dynamic routing based on classification results ```

Advanced Features:

🔄 Intelligent Retry Logic: - Exponential backoff for transient failures - Context preservation across retry attempts - Configurable retry policies per agent type - Partial success handling for complex workflows

📊 Real-time Monitoring: - Agent execution timing and performance metrics - LLM token usage and cost tracking - Memory usage and optimization insights - Error pattern detection and alerting

⚡ Resource Management: - Connection pooling for external services - Agent lifecycle management and cleanup - Memory optimization for long-running workflows - Graceful shutdown and resource release

🎯 Production Features: - Distributed execution across multiple workers - Load balancing and auto-scaling capabilities - Health checks and service discovery - Comprehensive logging and audit trails

Perfect for: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications

async run(input_data)[source]

Main entry point for orchestrator execution. Creates empty logs list and delegates to the comprehensive error handling method.

async run_parallel_agents(agent_ids, fork_group_id, input_data, previous_outputs)[source]

Run multiple branches in parallel, with agents within each branch running sequentially. Returns a list of log entries for each forked agent.

enqueue_fork(agent_ids, fork_group_id)[source]

Add agents to the fork queue for processing.