orka.orchestrator package

Orchestrator Package

The orchestrator package contains the modular components that make up OrKa’s core orchestration engine. The orchestrator was designed with a modular architecture for specialized components while maintaining 100% backward compatibility.

Architecture Overview

The orchestrator uses a multiple inheritance composition pattern to combine specialized functionality from focused components:

Core Components

OrchestratorBase

Handles initialization, configuration loading, and basic setup

AgentFactory

Manages agent registry, instantiation, and the AGENT_TYPES mapping

ExecutionEngine

Contains the main execution loop, agent coordination, and workflow management

PromptRenderer

Handles Jinja2 template rendering and prompt formatting

ErrorHandler

Provides comprehensive error tracking, retry logic, and failure reporting

MetricsCollector

Collects LLM metrics, runtime information, and generates performance reports

Composition Strategy

The main Orchestrator class inherits from all components using multiple inheritance, ensuring that:

  1. Method Resolution Order is preserved for consistent behavior

  2. All functionality remains accessible through the same interface

  3. Zero breaking changes are introduced for existing code

  4. Internal modularity improves maintainability and testing

Usage Example

from orka.orchestrator import Orchestrator

# Initialize with YAML configuration
orchestrator = Orchestrator("workflow.yml")

# Run the workflow (uses all components seamlessly)
result = await orchestrator.run("input data")

Module Components

Available Modules:

  • base - Core initialization and configuration

  • agent_factory - Agent registry and instantiation

  • execution_engine - Main execution loop and coordination

  • prompt_rendering - Template processing and formatting

  • error_handling - Error tracking and retry logic

  • metrics - Performance metrics and reporting

Benefits of Modular Design

Maintainability

Each component has a single, focused responsibility

Testability

Components can be tested in isolation

Extensibility

New functionality can be added without affecting other components

Code Organization

Related functionality is grouped together logically

Backward Compatibility

Existing code continues to work without modification

class orka.orchestrator.AgentFactory(orchestrator_cfg: Dict[str, Any], agent_cfgs: List[Dict[str, Any]], memory: BaseMemoryLogger)[source]

Bases: object

Factory class for creating and initializing agents based on configuration.

class orka.orchestrator.ErrorHandler[source]

Bases: object

Handles error tracking, reporting, and recovery mechanisms.

step_index: Any
run_id: Any
error_telemetry: Any
memory: Any
class orka.orchestrator.ExecutionEngine(*args: Any, **kwargs: Any)[source]

Bases: OrchestratorBase, PromptRenderer, ErrorHandler, MetricsCollector

ExecutionEngine coordinates complex multi-agent workflows within the OrKa framework.

Core Features: - Agent execution with precise coordination - Rich context flow across workflow steps - Fault tolerance with automatic recovery - Real-time optimization and resource management - Scalable architecture for distributed execution

Execution Patterns:

Sequential Processing: ```yaml orchestrator:

strategy: sequential agents: [classifier, router, processor, responder]

```

Parallel Processing: ```yaml orchestrator:

strategy: parallel agents: [validator_1, validator_2, validator_3]

```

Decision Tree: ```yaml orchestrator:

strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator]

```

Advanced Features: - Intelligent retry logic with exponential backoff - Real-time monitoring and performance metrics - Resource management and connection pooling - Production-ready distributed capabilities

Use Cases: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications

enqueue_fork(agent_ids: List[str], fork_group_id: str) None[source]

Add agents to the fork queue for processing.

async run(input_data: Any, return_logs: bool = False) Any[source]

Execute the orchestrator with the given input data.

Parameters:
  • input_data – The input data for the orchestrator

  • return_logs – If True, return full logs; if False, return final response (default: False)

Returns:

Either the logs array or the final response based on return_logs parameter

async run_parallel_agents(agent_ids: List[str], fork_group_id: str, input_data: Any, previous_outputs: Dict[str, Any]) List[Dict[str, Any]][source]

Enhanced parallel execution with better error handling and logging. Returns a list of log entries for each forked agent.

class orka.orchestrator.MetricsCollector[source]

Bases: object

Handles metrics collection, aggregation, and reporting.

static build_previous_outputs(logs: List[Dict[str, Any]]) Dict[str, Any][source]

Build a dictionary of previous agent outputs from the execution logs. Used to provide context to downstream agents.

run_id: Any
class orka.orchestrator.Orchestrator(config_path: str)[source]

Bases: ExecutionEngine, OrchestratorBase, AgentFactory, PromptRenderer, ErrorHandler, MetricsCollector

The Orchestrator is the core engine that loads a YAML configuration, instantiates agents and nodes, and manages the execution of the reasoning workflow. It supports parallelism, dynamic routing, and full trace logging.

This class now inherits from multiple mixins to provide all functionality while maintaining the same public interface.

__init__(config_path: str) None[source]

Initialize the Orchestrator with a YAML config file. Loads orchestrator and agent configs, sets up memory and fork management.

class orka.orchestrator.OrchestratorBase(config_path: str)[source]

Bases: object

Base orchestrator class that handles initialization and configuration.

This class provides the foundational infrastructure for the OrKa orchestration framework, including configuration loading, memory backend setup, and core state management. It is designed to be composed with other specialized classes through multiple inheritance.

The class automatically configures the appropriate backend based on environment variables and provides comprehensive error tracking capabilities for monitoring and debugging orchestration runs.

loader

Configuration file loader and validator

Type:

YAMLLoader

orchestrator_cfg

Orchestrator-specific configuration settings

Type:

dict

agent_cfgs

List of agent configuration objects

Type:

list

memory

Memory backend instance (Redis or RedisStack)

fork_manager

Fork group manager for parallel execution

queue

Current agent execution queue

Type:

list

run_id

Unique identifier for this orchestration run

Type:

str

step_index

Current step counter for traceability

Type:

int

error_telemetry

Comprehensive error tracking and metrics

Type:

dict

__init__(config_path: str) None[source]

Initialize the Orchestrator with a YAML config file.

Sets up all core infrastructure including configuration loading, memory backend selection, fork management, and error tracking systems.

Parameters:

config_path (str) – Path to the YAML configuration file

Environment Variables:

ORKA_MEMORY_BACKEND: Memory backend type (‘redis’ or ‘redisstack’, default: ‘redisstack’) ORKA_DEBUG_KEEP_PREVIOUS_OUTPUTS: Keep previous outputs for debugging (‘true’/’false’) REDIS_URL: Redis connection URL (default: ‘redis://localhost:6380/0’)

enqueue_fork(agent_ids: list[str], fork_group_id: str) None[source]

Enqueue a fork group for parallel execution.

class orka.orchestrator.PromptRenderer[source]

Bases: object

Handles prompt rendering and template processing using Jinja2.

This class provides methods for rendering dynamic prompts with context data, processing agent responses, and managing template-related operations within the orchestrator workflow.

The renderer supports complex template structures and provides robust error handling to ensure that template failures don’t interrupt workflow execution.

static normalize_bool(value)[source]

Normalize a value to boolean with support for complex agent responses.

This utility method handles the conversion of various data types to boolean values, with special support for complex agent response structures that may contain nested results.

Parameters:

value – The value to normalize (bool, str, dict, or other)

Returns:

The normalized boolean value

Return type:

bool

Supported Input Types:
  • bool: Returned as-is

  • str: ‘true’, ‘yes’ (case-insensitive) → True, others → False

  • dict: Extracts from ‘result’ or ‘response’ keys with recursive processing

  • other: Defaults to False

Example

# Simple cases
assert PromptRenderer.normalize_bool(True) == True
assert PromptRenderer.normalize_bool("yes") == True
assert PromptRenderer.normalize_bool("false") == False

# Complex agent response
response = {"result": {"response": "true"}}
assert PromptRenderer.normalize_bool(response) == True
render_prompt(template_str, payload)[source]

Render a Jinja2 template string with comprehensive error handling.

This method is the core template rendering functionality, taking a template string and context payload to produce a rendered prompt for agent execution.

Parameters:
  • template_str (str) – The Jinja2 template string to render

  • payload (dict) – Context data for template variable substitution

Returns:

The rendered template with variables substituted

Return type:

str

Raises:
  • ValueError – If template_str is not a string

  • jinja2.TemplateError – If template syntax is invalid

Example

template = "Hello {{ name }}, you have {{ count }} messages"
context = {"name": "Alice", "count": 5}
result = renderer.render_prompt(template, context)
# Returns: "Hello Alice, you have 5 messages"

Submodules