Source code for orka.orchestrator.execution_engine

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-reasoning

"""
Execution Engine
===============

The ExecutionEngine is the core component responsible for coordinating and executing
multi-agent workflows within the OrKa orchestration framework.

Core Responsibilities
--------------------

**Agent Coordination:**
- Sequential execution of agents based on configuration
- Context propagation between agents with previous outputs
- Dynamic queue management for workflow control
- Error handling and retry logic with exponential backoff

**Execution Patterns:**
- **Sequential Processing**: Default execution pattern where agents run one after another
- **Parallel Execution**: Fork/join patterns for concurrent agent execution
- **Conditional Branching**: Router nodes for dynamic workflow paths
- **Memory Operations**: Integration with memory nodes for data persistence

**Error Management:**
- Comprehensive error tracking and telemetry collection
- Automatic retry with configurable maximum attempts
- Graceful degradation and fallback strategies
- Detailed error reporting and recovery actions

Architecture Details
-------------------

**Execution Flow:**
1. **Queue Processing**: Agents are processed from the configured queue
2. **Context Building**: Input data and previous outputs are combined into payload
3. **Agent Execution**: Individual agents are executed with full context
4. **Result Processing**: Outputs are captured and added to execution history
5. **Queue Management**: Next agents are determined based on results

**Context Management:**
- Input data is preserved throughout the workflow
- Previous outputs from all agents are available to subsequent agents
- Execution metadata (timestamps, step indices) is tracked
- Error context is maintained for debugging and recovery

**Concurrency Handling:**
- Thread pool executor for parallel agent execution
- Fork group management for coordinated parallel operations
- Async/await patterns for non-blocking operations
- Resource pooling for efficient memory usage

Implementation Features
----------------------

**Agent Execution:**
- Support for both sync and async agent implementations
- Automatic detection of agent execution patterns
- Timeout handling with configurable limits
- Resource cleanup after agent completion

**Memory Integration:**
- Automatic logging of agent execution events
- Memory backend integration for persistent storage
- Context preservation across workflow steps
- Trace ID propagation for debugging

**Error Handling:**
- Exception capture and structured error reporting
- Retry logic with exponential backoff
- Error telemetry collection for monitoring
- Graceful failure recovery

**Performance Optimization:**
- Efficient context building and propagation
- Minimal memory overhead for large workflows
- Optimized queue processing algorithms
- Resource pooling for external connections

Execution Patterns
-----------------

**Sequential Execution:**
```yaml
orchestrator:
  strategy: sequential
  agents: [classifier, router, processor, responder]
```

**Parallel Execution:**
```yaml
orchestrator:
  strategy: parallel
  fork_groups:
    - agents: [validator_1, validator_2, validator_3]
      join_agent: aggregator
```

**Conditional Branching:**
```yaml
agents:
  - id: router
    type: router
    conditions:
      - condition: "{{ classification == 'urgent' }}"
        next_agents: [urgent_handler]
      - condition: "{{ classification == 'normal' }}"
        next_agents: [normal_handler]
```

Integration Points
-----------------

**Memory System:**
- Automatic event logging for all agent executions
- Context preservation in memory backend
- Trace ID propagation for request tracking
- Performance metrics collection

**Error Handling:**
- Structured error reporting with context
- Retry mechanisms with configurable policies
- Error telemetry for monitoring and alerting
- Recovery action recommendations

**Monitoring:**
- Real-time execution metrics
- Agent performance tracking
- Resource usage monitoring
- Error rate and pattern analysis
"""

import asyncio
import inspect
import json
import logging
import os
from concurrent.futures import ThreadPoolExecutor
from datetime import UTC, datetime
from time import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, TypeVar, cast

from .base import OrchestratorBase
from .error_handling import ErrorHandler as OrchestratorErrorHandling
from .metrics import MetricsCollector as OrchestratorMetricsCollector
from .prompt_rendering import PromptRenderer

logger = logging.getLogger(__name__)


# Define a type variable that is bound to ExecutionEngine and includes all necessary attributes
[docs] class ExecutionEngineProtocol(OrchestratorBase): """Protocol defining required attributes for ExecutionEngine type variable.""" agents: Dict[str, Any]
T = TypeVar("T", bound="ExecutionEngineProtocol")
[docs] class ExecutionEngine( OrchestratorBase, PromptRenderer, OrchestratorErrorHandling, OrchestratorMetricsCollector ): """ ExecutionEngine coordinates complex multi-agent workflows within the OrKa framework. Core Features: - Agent execution with precise coordination - Rich context flow across workflow steps - Fault tolerance with automatic recovery - Real-time optimization and resource management - Scalable architecture for distributed execution Execution Patterns: Sequential Processing: ```yaml orchestrator: strategy: sequential agents: [classifier, router, processor, responder] ``` Parallel Processing: ```yaml orchestrator: strategy: parallel agents: [validator_1, validator_2, validator_3] ``` Decision Tree: ```yaml orchestrator: strategy: decision-tree agents: [classifier, router, [path_a, path_b], aggregator] ``` Advanced Features: - Intelligent retry logic with exponential backoff - Real-time monitoring and performance metrics - Resource management and connection pooling - Production-ready distributed capabilities Use Cases: - Multi-step AI reasoning workflows - High-throughput content processing pipelines - Real-time decision systems with complex branching - Fault-tolerant distributed AI applications """ def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) # Initialize PromptRenderer explicitly to ensure render_prompt method is available PromptRenderer.__init__(self) self.agents: Dict[str, Any] = {} # Set orchestrator reference for fork/join nodes - ExecutionEngine is part of Orchestrator self.orchestrator = self
[docs] async def run(self: "ExecutionEngine", input_data: Any, return_logs: bool = False) -> Any: """ Execute the orchestrator with the given input data. Args: input_data: The input data for the orchestrator return_logs: If True, return full logs; if False, return final response (default: False) Returns: Either the logs array or the final response based on return_logs parameter """ logs: List[Any] = [] try: result = await self._run_with_comprehensive_error_handling( input_data, logs, return_logs, ) return result except Exception as e: self._record_error( "orchestrator_execution", "main", f"Orchestrator execution failed: {e}", e, recovery_action="fail", ) logger.critical(f"[ORKA-CRITICAL] Orchestrator execution failed: {e}") raise
async def _run_with_comprehensive_error_handling( self: "ExecutionEngine", input_data: Any, logs: List[Dict[str, Any]], return_logs: bool = False, ) -> Any: """ Main execution loop with comprehensive error handling wrapper. Args: input_data: The input data for the orchestrator logs: List to store execution logs return_logs: If True, return full logs; if False, return final response """ try: queue = self.orchestrator_cfg["agents"][:] while queue: agent_id = queue.pop(0) try: agent = self.agents[agent_id] agent_type = agent.type self.step_index += 1 # Build payload for the agent: current input and all previous outputs payload = { "input": input_data, "previous_outputs": self.build_previous_outputs(logs), } # Add orchestrator to context for nodes that need it if agent_type in ("forknode", "graphscoutagent"): payload["orchestrator"] = self freezed_payload = json.dumps( {k: v for k, v in payload.items() if k != "orchestrator"}, ) # Freeze the payload as a string for logging/debug, excluding orchestrator logger.info( f"Running agent '{agent_id}' of type '{agent_type}', payload: {freezed_payload}", ) log_entry = { "agent_id": agent_id, "event_type": agent.__class__.__name__, "timestamp": datetime.now(UTC).isoformat(), } start_time = time() # Attempt to run agent with retry logic max_retries = 3 retry_count = 0 agent_result = None while retry_count < max_retries: try: # Execute the agent with appropriate method # Execute agent with template rendering and context preservation _, agent_result = await self._run_agent_async( agent_id, payload.get("input", payload), payload.get("previous_outputs", {}), full_payload=payload, # Pass full payload including orchestrator ) # If agent is waiting (e.g., for async input), return waiting status if ( isinstance(agent_result, dict) and agent_result.get("status") == "waiting" ): logger.info( f"Agent '{agent_id}' returned waiting status: {agent_result}", ) # Put agent back in queue to retry later queue.append(agent_id) break # If we got a result, break retry loop if agent_result is not None: break retry_count += 1 if retry_count < max_retries: logger.warning( f"Agent '{agent_id}' failed (attempt {retry_count}/{max_retries}): {agent_result}", ) await asyncio.sleep(1) # Wait before retry else: logger.error( f"Agent '{agent_id}' failed after {max_retries} attempts", ) except Exception as e: retry_count += 1 if retry_count < max_retries: logger.warning( f"Agent '{agent_id}' failed (attempt {retry_count}/{max_retries}): {e}", ) await asyncio.sleep(1) # Wait before retry else: logger.error( f"Agent '{agent_id}' failed after {max_retries} attempts: {e}", ) raise # Process agent result if agent_result is not None: # Special handling for router nodes if agent_type == "routernode": if isinstance(agent_result, list): queue = agent_result + queue continue # Skip to the next agent in the new queue # Special handling for GraphScout decisions if agent_type == "graphscoutagent": if isinstance(agent_result, dict) and "decision" in agent_result: # CRITICAL: Log GraphScout result BEFORE routing to ensure it appears in traces payload_out = { k: v for k, v in payload.items() if k != "orchestrator" } # 🔍 DEBUG: Check agent_result before logging logger.info( f"🔍 DEBUG: agent_result before logging: {agent_result.get('target')}" ) payload_out.update( agent_result ) # Include the full GraphScout result # 🔍 DEBUG: Check payload_out after update logger.info( f"🔍 DEBUG: payload_out target after update: {payload_out.get('target')}" ) # Log GraphScout execution log_data = { "agent_id": agent_id, "event_type": "GraphScoutAgent", "timestamp": datetime.now(UTC).isoformat(), "payload": payload_out, "step": self.step_index, "run_id": self.run_id, } logs.append(log_data) # Log to memory backend if self.memory: self.memory.log( agent_id, "GraphScoutAgent", payload_out, step=self.step_index, run_id=self.run_id, ) # Note: Don't modify payload directly to avoid recursion issues # The result will be stored in logs and available for subsequent agents # Now handle routing decisions # Create a copy to avoid modifying the logged result decision_type = agent_result.get("decision") target = agent_result.get("target") # Work with a copy of the target to avoid modifying the original if isinstance(target, list): target = target.copy() if decision_type == "commit_next" and target: # Route to single next agent - REPLACE entire queue initial_queue = [str(target)] # ✅ STRUCTURED ENFORCEMENT: Validate terminal agent queue = self._validate_and_enforce_terminal_agent(initial_queue) logger.info(f"GraphScout routing to: {target}") continue elif decision_type == "commit_path" and target: # Route to path sequence - REPLACE entire queue if isinstance(target, list): # ✅ STRUCTURED ENFORCEMENT: Validate terminal agent queue = self._validate_and_enforce_terminal_agent(target) logger.info(f"GraphScout routing to path: {queue}") continue elif decision_type == "shortlist": # Apply intelligent memory agent routing and execute sequence shortlist = agent_result.get("target", []) if shortlist: # Apply memory agent routing logic agent_sequence = self._apply_memory_routing_logic(shortlist) # Execute the intelligently ordered sequence queue = agent_sequence logger.info( f"GraphScout executing intelligently routed sequence: {' → '.join(agent_sequence)} ({len(agent_sequence)} agents)" ) continue # For fallback or other decisions, continue normal execution # Create a copy of the payload for logging (without orchestrator) payload_out = {k: v for k, v in payload.items() if k != "orchestrator"} # Handle different result types if isinstance(agent_result, dict): # Case 1: Local LLM agent response if "response" in agent_result: payload_out.update( { "response": agent_result["response"], "confidence": agent_result.get("confidence", "0.0"), "internal_reasoning": agent_result.get( "internal_reasoning", "" ), "_metrics": agent_result.get("_metrics", {}), "formatted_prompt": agent_result.get( "formatted_prompt", "" ), } ) # Case 2: Memory agent response elif "memories" in agent_result: payload_out.update( { "memories": agent_result["memories"], "query": agent_result.get("query", ""), "backend": agent_result.get("backend", ""), "search_type": agent_result.get("search_type", ""), "num_results": agent_result.get("num_results", 0), } ) # Case 3: Fork/Join node response elif "status" in agent_result: payload_out.update(agent_result) # Case 4: Other result types else: payload_out["result"] = agent_result else: # Case 5: Non-dict result payload_out["result"] = agent_result # Special handling for fork and join nodes if agent_type == "forknode": # Fork node logs immediately, then executes children fork_group_id = agent_result.get("fork_group") if fork_group_id: payload_out["fork_group_id"] = fork_group_id payload_out["fork_execution_status"] = "initiated" # Log fork node immediately log_entry = { "agent_id": agent_id, "event_type": agent.__class__.__name__, "timestamp": datetime.now(UTC).isoformat(), "payload": payload_out.copy(), "step": self.step_index, "run_id": self.run_id, "previous_outputs": self.build_previous_outputs(logs), } logs.append(log_entry) # Log to memory backend immediately self.memory.log( agent_id, agent.__class__.__name__, payload_out.copy(), step=self.step_index, run_id=self.run_id, previous_outputs=self.build_previous_outputs(logs[:-1]), ) # Execute forked agents after logging fork forked_agents = agent_result.get("agents", []) if forked_agents: logger.info( f"Executing {len(forked_agents)} forked agents for group {fork_group_id}" ) # Get current context for forked execution current_previous_outputs = self.build_previous_outputs(logs) # Execute in parallel try: fork_logs = await self.run_parallel_agents( forked_agents, fork_group_id, input_data, current_previous_outputs, ) # Add fork logs to main logs logs.extend(fork_logs) logger.info( f"Completed execution of {len(fork_logs)} forked agents" ) except Exception as fork_error: logger.error(f"Fork execution failed: {fork_error}") # Skip normal logging since we already logged the fork node continue elif agent_type == "joinnode": # Join node aggregates forked results # Get fork_group_id from payload or use the configured group fork_group_id = payload.get("fork_group_id") if not fork_group_id and hasattr(agent, "group_id"): # Use configured group (e.g., "fork_3" from YAML) fork_group_id = agent.group_id if fork_group_id: # Generate the actual fork group ID pattern (fork_3_timestamp) # Look for any fork group that starts with our configured group actual_fork_group_id = None for log in logs: log_fork_group = log.get("fork_group_id") if log_fork_group and log_fork_group.startswith( f"{fork_group_id}_" ): actual_fork_group_id = log_fork_group break if actual_fork_group_id: # Collect all forked results from logs forked_results = [] for log in logs: if log.get( "fork_group_id" ) == actual_fork_group_id and log.get( "event_type", "" ).startswith( "ForkedAgent-" ): forked_results.append( { "agent_id": log["agent_id"], "result": log["payload"], "step": log.get("step"), "timestamp": log.get("timestamp"), } ) # Add joined results to payload payload_out["joined_results"] = forked_results payload_out["fork_group_id"] = actual_fork_group_id agent_result["fork_group_id"] = actual_fork_group_id logger.info( f"Join node collected {len(forked_results)} results from fork group {actual_fork_group_id}" ) else: logger.warning( f"Join node could not find fork group matching pattern '{fork_group_id}_*'" ) else: logger.warning( f"Join node '{agent_id}' has no fork_group_id or group configuration" ) # Store the result in memory result_key = f"agent_result:{agent_id}" self.memory.set(result_key, json.dumps(payload_out)) logger.debug(f"- Stored result for agent {agent_id}") # Store in Redis hash for group tracking group_key = "agent_results" self.memory.hset(group_key, agent_id, json.dumps(payload_out)) logger.debug(f"- Stored result in group for agent {agent_id}") # Add to logs log_entry["payload"] = payload_out logs.append(log_entry) # ✅ FIX: Log to memory backend like forked agents self.memory.log( agent_id, agent.__class__.__name__, payload_out, step=self.step_index, run_id=self.run_id, previous_outputs=self.build_previous_outputs( logs[:-1] ), # Exclude current log ) self.memory.memory.append(log_entry) # Keep for file trace compatibility except Exception as agent_error: # Log the error and continue with next agent logger.error(f"Error executing agent {agent_id}: {agent_error}") continue # Generate meta report with aggregated metrics meta_report = self._generate_meta_report(logs) # Store meta report in memory for saving meta_report_entry = { "agent_id": "meta_report", "event_type": "MetaReport", "timestamp": datetime.now(UTC).isoformat(), "payload": { "meta_report": meta_report, "run_id": self.run_id, "timestamp": datetime.now().strftime("%Y%m%d_%H%M%S"), }, } self.memory.memory.append(meta_report_entry) # Save logs to file at the end of the run log_dir = os.getenv("ORKA_LOG_DIR", "logs") os.makedirs(log_dir, exist_ok=True) log_path = os.path.join( log_dir, f"orka_trace_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" ) # Save enhanced trace with memory backend data enhanced_trace = self._build_enhanced_trace(logs, meta_report) self.memory.save_enhanced_trace(log_path, enhanced_trace) # Cleanup memory backend resources to prevent hanging try: if hasattr(self.memory, "close"): self.memory.close() except Exception as e: logger.warning(f"Warning: Failed to cleanly close memory backend: {e!s}") # Print meta report summary logger.info("\n" + "=" * 50) logger.info("ORKA EXECUTION META REPORT") logger.info("=" * 50) logger.info(f"Total Execution Time: {meta_report['total_duration']:.3f}s") logger.info(f"Total LLM Calls: {meta_report['total_llm_calls']}") logger.info(f"Total Tokens: {meta_report['total_tokens']}") logger.info(f"Total Cost: ${meta_report['total_cost_usd']:.6f}") logger.info(f"Average Latency: {meta_report['avg_latency_ms']:.2f}ms") logger.info("=" * 50) # Return either logs or final response based on parameter if return_logs: # Return full logs for internal workflows (like loop nodes) return logs else: # Extract the final response from the last non-memory agent for user-friendly output final_response = self._extract_final_response(logs) return final_response except Exception as e: # Handle any unexpected errors logger.error(f"Unexpected error in execution engine: {e}") raise async def _run_agent_async( self: "ExecutionEngine", agent_id: str, input_data: Any, previous_outputs: Dict[str, Any], full_payload: Optional[Dict[str, Any]] = None, ) -> Tuple[str, Any]: """ Run a single agent asynchronously. """ agent = self.agents[agent_id] # Create a complete payload with all necessary context payload = { "input": input_data, "previous_outputs": previous_outputs, } # Include orchestrator context from full_payload if available if full_payload and "orchestrator" in full_payload: payload["orchestrator"] = full_payload["orchestrator"] logger.debug(f"- Agent '{agent_id}' inherited orchestrator context from full_payload") # Add loop context if available if isinstance(input_data, dict): if "loop_number" in input_data: payload["loop_number"] = input_data["loop_number"] if "past_loops_metadata" in input_data: payload["past_loops_metadata"] = input_data["past_loops_metadata"] # Render prompt before running agent if agent has a prompt # Also check for ValidationAndStructuringAgent which stores prompt in llm_agent agent_prompt = None if hasattr(agent, "prompt") and agent.prompt: agent_prompt = agent.prompt elif ( hasattr(agent, "llm_agent") and hasattr(agent.llm_agent, "prompt") and agent.llm_agent.prompt ): agent_prompt = agent.llm_agent.prompt if agent_prompt: try: # Build complete template context template_context = self._build_template_context(payload, agent_id) # Debug template context if needed logger.debug( f"- Template context for '{agent_id}': {list(template_context.keys())}" ) if "get_input" in template_context: logger.debug( f"Helper functions available: get_input, get_loop_number, get_agent_response" ) # Test if functions are callable try: test_input = template_context["get_input"]() logger.debug(f"- get_input() test successful: '{test_input}'") except Exception as e: logger.error(f"get_input() test failed: {e}") else: logger.error("Helper functions NOT found in template context") logger.error(f"Available keys: {list(template_context.keys())}") # Validate template before rendering missing_vars = self._validate_template_variables(agent_prompt, template_context) if missing_vars: logger.warning(f"Agent '{agent_id}' template missing variables: {missing_vars}") # Enhanced debugging for template issues prev_outputs = template_context.get("previous_outputs", {}) logger.warning( f"Available agents in previous_outputs: {list(prev_outputs.keys())}" ) for agent_name, agent_result in prev_outputs.items(): if isinstance(agent_result, dict): logger.warning(f" {agent_name}: keys = {list(agent_result.keys())}") else: logger.warning(f" {agent_name}: {type(agent_result)} = {agent_result}") logger.debug(f"- Available context keys: {list(template_context.keys())}") if "previous_outputs" in template_context: logger.debug( f"Available previous_outputs: {list(template_context['previous_outputs'].keys())}" ) # Show structure of each agent result for debugging for prev_agent, prev_result in template_context["previous_outputs"].items(): if isinstance(prev_result, dict): logger.debug( f"Agent '{prev_agent}' result keys: {list(prev_result.keys())}" ) else: logger.debug( f"[DEBUG] - Agent '{prev_agent}' result type: {type(prev_result)}" ) # Use template context directly from jinja2 import Template template = Template(agent_prompt) # Debug: Show what we're about to render logger.debug( f"- About to render template with {len(template_context)} context items" ) logger.debug(f"- Template preview: {agent_prompt[:200]}...") formatted_prompt = template.render(**template_context) # Log successful rendering logger.info(f"Template rendered for '{agent_id}' - length: {len(formatted_prompt)}") # Debug: Show a preview of the rendered result logger.debug(f"- Rendered preview: {formatted_prompt[:200]}...") # Check for unresolved variables and warn if found import re unresolved_pattern = r"\{\{\s*[^}]+\s*\}\}" unresolved_vars = re.findall(unresolved_pattern, formatted_prompt) if unresolved_vars: logger.warning( f"Still found {len(unresolved_vars)} unresolved variables after rendering: {unresolved_vars[:3]}" ) # Replace unresolved variables with empty strings for now formatted_prompt = re.sub(unresolved_pattern, "", formatted_prompt) formatted_prompt = re.sub(r"\s+", " ", formatted_prompt).strip() payload["formatted_prompt"] = formatted_prompt # Verify rendering was successful if self._has_unresolved_variables(formatted_prompt): logger.error( f"Agent '{agent_id}' has unresolved template variables in: {formatted_prompt}" ) payload["template_error"] = "unresolved_variables" # Debug logging for template rendering if logger.isEnabledFor(logging.DEBUG): original_template = agent_prompt if original_template != formatted_prompt: logger.debug(f"- Agent '{agent_id}' template rendered successfully") logger.debug(f"- Original: {original_template}") logger.debug(f"- Rendered: {formatted_prompt}") else: logger.debug( f"Agent '{agent_id}' template unchanged - possible template issue" ) logger.debug(f"- Template context: {template_context}") except Exception as e: logger.error(f"Failed to render prompt for agent '{agent_id}': {e}") payload["formatted_prompt"] = agent_prompt if agent_prompt else "" payload["template_error"] = str(e) # Inspect the run method to see if it needs orchestrator run_method = agent.run sig = inspect.signature(run_method) needs_orchestrator = len(sig.parameters) > 1 # More than just 'self' is_async = inspect.iscoroutinefunction(run_method) # Log orchestrator context detection logger.debug(f"- Agent '{agent_id}' run method signature: {sig}") logger.debug(f"- Agent '{agent_id}' parameter count: {len(sig.parameters)}") logger.debug(f"- Agent '{agent_id}' needs_orchestrator: {needs_orchestrator}") logger.debug(f"- Agent '{agent_id}' is_async: {is_async}") logger.debug(f"- Agent '{agent_id}' agent type: {type(agent).__name__}") # Execute the agent with appropriate method try: if needs_orchestrator: # Node that needs orchestrator - create context with orchestrator reference context_with_orchestrator = { **payload, "orchestrator": self.orchestrator, # Pass the actual orchestrator } # Log orchestrator context passing logger.debug( f"Agent '{agent_id}' orchestrator context keys: {list(context_with_orchestrator.keys())}" ) logger.debug( f"Agent '{agent_id}' orchestrator object: {type(self.orchestrator).__name__}" ) logger.debug( f"Agent '{agent_id}' orchestrator has fork_manager: {hasattr(self.orchestrator, 'fork_manager')}" ) result = run_method(context_with_orchestrator) if is_async or asyncio.iscoroutine(result): result = await result elif is_async: # Async node/agent that doesn't need orchestrator result = await run_method(payload) else: # Synchronous agent loop = asyncio.get_event_loop() with ThreadPoolExecutor() as pool: result = await loop.run_in_executor(pool, run_method, payload) return agent_id, result except Exception as e: logger.error(f"Failed to execute agent '{agent_id}': {e}") raise async def _run_branch_async( self: "ExecutionEngine", branch_agents: List[str], input_data: Any, previous_outputs: Dict[str, Any], ) -> Dict[str, Any]: """ Run a sequence of agents in a branch sequentially. """ branch_results = {} for agent_id in branch_agents: agent_id, result = await self._run_agent_async( agent_id, input_data, previous_outputs, full_payload=None, # No orchestrator context needed for branch agents ) branch_results[agent_id] = result # Update previous_outputs for the next agent in the branch previous_outputs = {**previous_outputs, **branch_results} return branch_results
[docs] async def run_parallel_agents( self: "ExecutionEngine", agent_ids: List[str], fork_group_id: str, input_data: Any, previous_outputs: Dict[str, Any], ) -> List[Dict[str, Any]]: """ Enhanced parallel execution with better error handling and logging. Returns a list of log entries for each forked agent. """ logger.info( f"Starting parallel execution of {len(agent_ids)} agents in fork group {fork_group_id}" ) # Validate agents exist missing_agents = [aid for aid in agent_ids if aid not in self.agents] if missing_agents: raise ValueError(f"Missing agents for parallel execution: {missing_agents}") # Ensure complete context is passed to forked agents enhanced_previous_outputs = self._ensure_complete_context(previous_outputs) # Get fork node configuration fork_node_id = "_".join(fork_group_id.split("_")[:-1]) fork_node = self.agents.get(fork_node_id) if not fork_node: logger.warning(f"Fork node {fork_node_id} not found, using default execution") branches = [[agent_id] for agent_id in agent_ids] # Treat each as separate branch else: branches = getattr(fork_node, "targets", [[agent_id] for agent_id in agent_ids]) logger.debug(f"- Executing {len(branches)} branches: {branches}") # Execute branches in parallel try: branch_tasks = [ self._run_branch_async(branch, input_data, enhanced_previous_outputs.copy()) for branch in branches ] # Wait for all branches with timeout branch_results = await asyncio.wait_for( asyncio.gather(*branch_tasks, return_exceptions=True), timeout=300, # 5 minute timeout ) # Process results and handle exceptions result_logs: List[Dict[str, Any]] = [] updated_previous_outputs = enhanced_previous_outputs.copy() for i, branch_result in enumerate(branch_results): if isinstance(branch_result, BaseException): logger.error(f"Branch {i} failed: {branch_result}") # Create error log entry error_log = { "agent_id": f"branch_{i}_error", "event_type": "BranchError", "timestamp": datetime.now(UTC).isoformat(), "payload": {"error": str(branch_result)}, "step": f"{self.step_index}[{i}]", "run_id": self.run_id, } result_logs.append(error_log) continue # Process successful branch results for agent_id, result in branch_result.items(): step_index = f"{self.step_index}[{len(result_logs)}]" # Store result in Redis for JoinNode join_state_key = "waitfor:join_parallel_checks:inputs" self.memory.hset(join_state_key, agent_id, json.dumps(result)) # Create log entry agent = self.agents[agent_id] # Safely flatten result structure: # - If the agent returned a dict, use it directly to avoid double nesting # - Otherwise, wrap non-dict results under "result" for consistency if isinstance(result, dict): payload_data = result.copy() else: payload_data = {"result": result} # Ensure a formatted_prompt exists; if missing, render via fallback if "formatted_prompt" not in payload_data: payload_context = { "input": input_data, "previous_outputs": updated_previous_outputs, } # Add loop context when available to keep templates consistent if isinstance(input_data, dict): if "loop_number" in input_data: payload_context["loop_number"] = input_data["loop_number"] if "past_loops_metadata" in input_data: payload_context["past_loops_metadata"] = input_data[ "past_loops_metadata" ] self._add_prompt_to_payload(agent, payload_data, payload_context) log_data = { "agent_id": agent_id, "event_type": f"ForkedAgent-{agent.__class__.__name__}", "timestamp": datetime.now(UTC).isoformat(), "payload": payload_data, "step": len(result_logs), # Use numeric step index "run_id": self.run_id, "fork_group_id": fork_group_id, } result_logs.append(log_data) # Log to memory backend self.memory.log( agent_id, f"ForkedAgent-{agent.__class__.__name__}", payload_data, step=len(result_logs), run_id=self.run_id, fork_group=fork_group_id, previous_outputs=updated_previous_outputs.copy(), ) # Update context for next agents updated_previous_outputs[agent_id] = result logger.info(f"Parallel execution completed: {len(result_logs)} results") return result_logs except asyncio.TimeoutError: logger.error(f"Parallel execution timed out for fork group {fork_group_id}") raise except Exception as e: logger.error(f"Parallel execution failed: {e}") raise
def _ensure_complete_context(self, previous_outputs: Dict[str, Any]) -> Dict[str, Any]: """ Generic method to ensure previous_outputs has complete context for template rendering. This handles various agent result structures and ensures templates can access data. """ enhanced_outputs = {} for agent_id, agent_result in previous_outputs.items(): # Start with the original result enhanced_outputs[agent_id] = agent_result # If the result is a complex structure, ensure it's template-friendly if isinstance(agent_result, dict): # Handle different common agent result patterns # Pattern 1: Direct result (like memory nodes) if "memories" in agent_result and isinstance(agent_result["memories"], list): enhanced_outputs[agent_id] = { **agent_result, # Keep original structure "memories": agent_result["memories"], # Direct access } # Pattern 2: Local LLM agent response elif "response" in agent_result: enhanced_outputs[agent_id] = { **agent_result, # Keep original structure "response": agent_result["response"], # Direct access "confidence": agent_result.get("confidence", "0.0"), "internal_reasoning": agent_result.get("internal_reasoning", ""), "_metrics": agent_result.get("_metrics", {}), "formatted_prompt": agent_result.get("formatted_prompt", ""), } # Pattern 3: Nested result structure elif "result" in agent_result and isinstance(agent_result["result"], dict): nested_result = agent_result["result"] # For nested structures, also provide direct access to common fields if "response" in nested_result: enhanced_outputs[agent_id] = { **agent_result, # Keep original structure "response": nested_result["response"], # Direct access "confidence": nested_result.get("confidence", "0.0"), "internal_reasoning": nested_result.get("internal_reasoning", ""), "_metrics": nested_result.get("_metrics", {}), "formatted_prompt": nested_result.get("formatted_prompt", ""), } elif "memories" in nested_result: enhanced_outputs[agent_id] = { **agent_result, # Keep original structure "memories": nested_result["memories"], # Direct access "query": nested_result.get("query", ""), "backend": nested_result.get("backend", ""), "search_type": nested_result.get("search_type", ""), "num_results": nested_result.get("num_results", 0), } # Pattern 4: Fork/Join node responses elif "status" in agent_result: enhanced_outputs[agent_id] = { **agent_result, # Keep original structure "status": agent_result["status"], "fork_group": agent_result.get("fork_group", ""), "merged": agent_result.get("merged", {}), } # Pattern 5: Other dict structures else: enhanced_outputs[agent_id] = agent_result # Pattern 6: Non-dict results else: enhanced_outputs[agent_id] = agent_result return enhanced_outputs
[docs] def enqueue_fork(self: "ExecutionEngine", agent_ids: List[str], fork_group_id: str) -> None: """ Add agents to the fork queue for processing. """ for agent_id in agent_ids: self.queue.append(agent_id)
def _extract_final_response(self: "ExecutionEngine", logs: List[Dict[str, Any]]) -> Any: """ Extract the response from the last non-memory agent to return as the main result. Args: logs: List of agent execution logs Returns: The response from the last non-memory agent, or logs if no suitable agent found """ # Memory agent types that should be excluded from final response consideration excluded_agent_types = { "MemoryReaderNode", "MemoryWriterNode", "memory", "memoryreadernode", "memorywriternode", "validate_and_structure", # Exclude validator agents "guardian", # Exclude agents with 'guardian' in their name/type } # Agent types that are explicitly designed to provide a final answer final_response_agent_types = { "OpenAIAnswerBuilder", "LocalLLMAgent", } # Find the last suitable agent final_response_log_entry = None for log_entry in reversed(logs): _event_type = log_entry.get("event_type") if _event_type == "MetaReport": continue # Skip meta reports # Prefer nested payload.result.response if present payload = log_entry.get("payload", {}) nested_result = payload.get("result") if isinstance(nested_result, dict) and "response" in nested_result: return nested_result["response"] # Handle one extra nesting level: payload.result.result.response if isinstance(nested_result, dict): deeper_result = nested_result.get("result") if isinstance(deeper_result, dict) and "response" in deeper_result: return deeper_result["response"] # Prioritize agents explicitly designed to provide a final answer if _event_type in final_response_agent_types: # If no specific final response agent is found, consider the last non-excluded agent payload = log_entry.get("payload", {}) final_response_log_entry = log_entry if payload and ("result" in payload or "response" in payload): final_response_log_entry = log_entry break if not final_response_log_entry: logger.warning("No suitable final agent found, returning full logs") return logs # Extract the response from the final response log entry payload = final_response_log_entry.get("payload", {}) response = payload.get("response", {}) logger.info( f"[ORKA-FINAL] Returning response from final agent: {final_response_log_entry.get('agent_id')}", ) # Try to extract a clean response from the result if isinstance(response, dict): # Look for common response patterns if "response" in response: return response["response"] elif "result" in response: nested_result = response["result"] if isinstance(nested_result, dict): # Handle nested dict structure if "response" in nested_result: return nested_result["response"] else: return nested_result elif isinstance(nested_result, str): return nested_result else: return str(nested_result) else: # Return the entire result if no specific response field found return response elif isinstance(response, str): return response else: # Fallback to string representation return str(response) def _build_enhanced_trace( self, logs: List[Dict[str, Any]], meta_report: Optional[Dict[str, Any]] = None ) -> Dict[str, Any]: """Build enhanced trace with memory backend references, metadata, and meta report.""" enhanced_trace: Dict[str, Any] = { "execution_metadata": { "run_id": self.run_id, "total_agents": len(logs), "execution_time": datetime.now(UTC).isoformat(), "memory_backend": type(self.memory).__name__, "version": "1.1.0", # Enhanced trace format }, "memory_stats": self.memory.get_memory_stats(), "agent_executions": [], } # Include meta report if provided if meta_report: enhanced_trace["meta_report"] = meta_report for log_entry in logs: enhanced_entry = log_entry.copy() agent_id = log_entry.get("agent_id") if agent_id: try: # Add memory backend references (only for RedisStack) recent_memories = [] if hasattr(self.memory, "search_memories"): recent_memories = self.memory.search_memories( query="", node_id=agent_id, num_results=3, log_type="log" ) enhanced_entry["memory_references"] = [ { "key": mem.get("key", ""), "timestamp": mem.get("timestamp"), "content_preview": ( mem.get("content", "")[:100] + "..." if len(mem.get("content", "")) > 100 else mem.get("content", "") ), } for mem in recent_memories ] # Check template resolution status payload = enhanced_entry.get("payload", {}) formatted_prompt = payload.get("formatted_prompt", "") original_prompt = payload.get("prompt", "") enhanced_entry["template_resolution"] = { "has_template": bool(original_prompt), "was_rendered": formatted_prompt != original_prompt, "has_unresolved_vars": self._check_unresolved_variables(formatted_prompt), "variable_count": len(self._extract_template_variables(original_prompt)), } except Exception as e: logger.warning(f"Could not enhance trace for agent {agent_id}: {e}") enhanced_entry["enhancement_error"] = str(e) enhanced_trace["agent_executions"].append(enhanced_entry) return enhanced_trace def _check_unresolved_variables(self, text: str) -> bool: """Check if text contains unresolved Jinja2 variables.""" import re pattern = r"\{\{\s*[^}]+\s*\}\}" return bool(re.search(pattern, text)) def _has_unresolved_variables(self, text: str) -> bool: """Alias for _check_unresolved_variables for backward compatibility.""" return self._check_unresolved_variables(text) def _extract_template_variables(self, template: str) -> List[str]: """Extract all Jinja2 variables from template.""" import re pattern = r"\{\{\s*([^}]+)\s*\}\}" return re.findall(pattern, template) def _build_template_context(self, payload: Dict[str, Any], agent_id: str) -> Dict[str, Any]: """Build complete context for template rendering.""" # Start with original payload context = payload.copy() # Ensure previous_outputs exists and is properly structured if "previous_outputs" not in context: context["previous_outputs"] = {} # Add commonly expected template variables context.update( { "run_id": getattr(self, "run_id", "unknown"), "step_index": getattr(self, "step_index", 0), "agent_id": agent_id, "current_time": datetime.now(UTC).isoformat(), "workflow_name": getattr(self, "workflow_name", "unknown"), } ) # Add input data at root level if nested if "input" in context and isinstance(context["input"], dict): input_data = context["input"] # Common template variables that should be at root for var in ["loop_number", "past_loops_metadata", "user_input", "query"]: if var in input_data: context[var] = input_data[var] # Flatten previous_outputs for easier template access prev_outputs = context.get("previous_outputs", {}) flattened_outputs = {} for agent_name, agent_result in prev_outputs.items(): # Create a simplified, template-friendly version of agent results simplified_result = self._simplify_agent_result_for_templates(agent_result) flattened_outputs[agent_name] = simplified_result # Also add flattened access patterns for backward compatibility if isinstance(simplified_result, dict): if "response" in simplified_result: flattened_outputs[f"{agent_name}_response"] = simplified_result["response"] if "memories" in simplified_result: flattened_outputs[f"{agent_name}_memories"] = simplified_result["memories"] context["previous_outputs"] = flattened_outputs # Add template helper functions from PromptRenderer try: helper_functions = self._get_template_helper_functions(context) context.update(helper_functions) logger.debug(f"- Added {len(helper_functions)} helper functions to template context") except Exception as e: logger.error(f"Failed to add helper functions to template context: {e}") logger.error(f"Exception details: {type(e).__name__}: {e}") import traceback logger.error(f"Traceback: {traceback.format_exc()}") # Import PromptRenderer directly to ensure access to helper functions from orka.orchestrator.prompt_rendering import PromptRenderer # Create a temporary PromptRenderer instance to get the helper functions temp_renderer = PromptRenderer() try: helper_functions = temp_renderer._get_template_helper_functions(context) context.update(helper_functions) logger.info( f"Added {len(helper_functions)} helper functions via fallback PromptRenderer" ) except Exception as fallback_e: logger.error(f"Fallback PromptRenderer also failed: {fallback_e}") # Last resort: basic helper functions def get_input(): if "input" in context and isinstance(context["input"], dict): return context["input"].get("input", "") return str(context.get("input", "")) def get_loop_number(): if "loop_number" in context: return context["loop_number"] if "input" in context and isinstance(context["input"], dict): return context["input"].get("loop_number", 1) return 1 def get_agent_response(agent_name): prev_outputs = context.get("previous_outputs", {}) if agent_name in prev_outputs: agent_result = prev_outputs[agent_name] if isinstance(agent_result, dict): return agent_result.get("response", "") return str(agent_result) return "" # Add minimal stub functions for missing ones def get_agent_memory_context(agent_type, agent_name): return "No memory context available" def get_debate_evolution(): return "First round of debate" context.update( { "get_input": get_input, "get_loop_number": get_loop_number, "get_agent_response": get_agent_response, "get_agent_memory_context": get_agent_memory_context, "get_debate_evolution": get_debate_evolution, } ) logger.info("Added basic fallback helper functions to template context") return context def _validate_template_variables(self, template: str, context: Dict[str, Any]) -> List[str]: """Check for missing template variables with detailed path validation.""" import re # Extract all Jinja2 variables variable_pattern = r"\{\{\s*([^}|]+)(?:\|[^}]*)?\s*\}\}" variables = re.findall(variable_pattern, template) missing_vars = [] for var_expr in variables: var_path = var_expr.strip() # Check if the full path is accessible if not self._is_template_path_accessible(var_path, context): missing_vars.append(var_path) return missing_vars def _is_template_path_accessible(self, var_path: str, context: Dict[str, Any]) -> bool: """Check if a nested template variable path (like 'previous_outputs.binary_classifier.response') is accessible.""" try: # Handle function calls like get_input() or get_agent_response('name') if "(" in var_path and ")" in var_path: # Extract function name (before the first parenthesis) func_name = var_path.split("(")[0].strip() # Check if the function exists in context if func_name in context and callable(context[func_name]): logger.debug( f"Found callable function '{func_name}' for template path '{var_path}'" ) return True else: logger.debug(f"- Function '{func_name}' not found or not callable in context") # Show available functions for debugging available_funcs = [k for k, v in context.items() if callable(v)] logger.debug(f"- Available callable functions: {available_funcs}") return False # Split the path by dots path_parts = var_path.split(".") current = context for part in path_parts: # Handle array access like [0] if "[" in part and "]" in part: key = part.split("[")[0] index_str = part.split("[")[1].split("]")[0] if key not in current: logger.debug(f"- Missing key '{key}' in path '{var_path}'") return False current = current[key] try: index = int(index_str) if not isinstance(current, (list, tuple)): logger.debug( f"Expected list/tuple for array access in path '{var_path}'" ) return False if len(current) <= index: logger.debug(f"Invalid array access [{index}] in path '{var_path}'") return False current = current[index] except (ValueError, IndexError): logger.debug(f"- Invalid array index '{index_str}' in path '{var_path}'") return False else: # Simple key access if not isinstance(current, dict) or part not in current: # Enhanced debugging for missing keys, especially agent names if isinstance(current, dict): available_keys = list(current.keys()) logger.debug( f"Missing key '{part}' in path '{var_path}'. Available keys: {available_keys}" ) # Special handling for agent name mismatches in previous_outputs if ( len(path_parts) > 1 and path_parts[0] == "previous_outputs" and part == path_parts[1] ): # Looking for agent name # Find similar agent names that might be the intended target similar_agents = [ key for key in available_keys if part in key or key in part ] if similar_agents: logger.warning( f"Template references agent '{part}' but found similar agents: {similar_agents}. Did you mean one of these?" ) else: # Show all available agents for reference logger.warning( f"Template references agent '{part}' but available agents are: {available_keys}" ) else: logger.info( # type: ignore[unreachable] f"[DEBUG] - Cannot access key '{part}' in path '{var_path}' - current value is not a dict" ) return False else: current = current[part] return True except Exception as e: logger.debug(f"- Error validating template path '{var_path}': {e}") return False def _simplify_agent_result_for_templates(self, agent_result: Any) -> Any: """ Simplify complex agent result structures for template access. This method flattens nested result structures to make them easily accessible in Jinja2 templates with dot notation like {{ previous_outputs.agent_name.response }}. """ if not isinstance(agent_result, dict): return agent_result # Start with the original result simplified = agent_result.copy() # Pattern 1: Direct response at root level (like binary classifiers) if "response" in agent_result: simplified["response"] = agent_result["response"] if "confidence" in agent_result: simplified["confidence"] = agent_result["confidence"] if "internal_reasoning" in agent_result: simplified["internal_reasoning"] = agent_result["internal_reasoning"] return simplified # Pattern 2: Nested result structure (common pattern) if "result" in agent_result and isinstance(agent_result["result"], dict): nested_result = agent_result["result"] # Flatten nested response to root level for easy template access if "response" in nested_result: simplified["response"] = nested_result["response"] if "confidence" in nested_result: simplified["confidence"] = nested_result.get("confidence", "0.0") if "internal_reasoning" in nested_result: simplified["internal_reasoning"] = nested_result.get("internal_reasoning", "") if "_metrics" in nested_result: simplified["_metrics"] = nested_result.get("_metrics", {}) if "formatted_prompt" in nested_result: simplified["formatted_prompt"] = nested_result.get("formatted_prompt", "") # Handle memory results if "memories" in nested_result: simplified["memories"] = nested_result["memories"] simplified["query"] = nested_result.get("query", "") simplified["backend"] = nested_result.get("backend", "") simplified["search_type"] = nested_result.get("search_type", "") simplified["num_results"] = nested_result.get("num_results", 0) # Keep original nested structure for complex access simplified["result"] = nested_result return simplified # Pattern 3: Memory agent results if "memories" in agent_result: simplified["memories"] = agent_result["memories"] simplified["query"] = agent_result.get("query", "") simplified["backend"] = agent_result.get("backend", "") simplified["search_type"] = agent_result.get("search_type", "") simplified["num_results"] = agent_result.get("num_results", 0) return simplified # Pattern 4: Fork/Join results with merged data if "merged" in agent_result and isinstance(agent_result["merged"], dict): # Add merged results at root level for easy access for merged_agent_id, merged_result in agent_result["merged"].items(): if isinstance(merged_result, dict) and "response" in merged_result: simplified[f"{merged_agent_id}_response"] = merged_result["response"] simplified["merged"] = agent_result["merged"] return simplified # Default: return as-is for other structures return simplified def _select_best_candidate_from_shortlist( self, shortlist: List[Dict[str, Any]], question: str, context: Dict[str, Any] ) -> Dict[str, Any]: """ Select the best candidate from GraphScout's shortlist. GraphScout has already done sophisticated evaluation including LLM assessment, scoring, and ranking. We should trust its decision and use the top candidate. Args: shortlist: List of candidate agents from GraphScout (already ranked by score) question: The user's question context: Execution context Returns: The best candidate from the shortlist (typically the first one) """ try: if not shortlist: return {} # Trust GraphScout's intelligent ranking - use the top candidate best_candidate = shortlist[0] logger.info( f"Selected GraphScout's top choice: {best_candidate.get('node_id')} " f"(score: {best_candidate.get('score', 0.0):.3f})" ) return best_candidate except Exception as e: logger.error(f"Candidate selection failed: {e}") # Return first candidate as ultimate fallback return shortlist[0] if shortlist else {} def _validate_and_enforce_terminal_agent(self, queue: List[str]) -> List[str]: """ Validate that the workflow queue ends with an LLM-based response builder. If not, automatically append the best available response builder. Args: queue: Current agent execution queue Returns: Validated queue with guaranteed LLM terminal agent """ if not queue: return queue # Check if the last agent is already a response builder last_agent_id = queue[-1] if self._is_response_builder(last_agent_id): logger.info(f"✅ Terminal validation passed: {last_agent_id} is a response builder") return queue # Find the best response builder to append response_builder = self._get_best_response_builder() if response_builder: validated_queue = queue + [response_builder] logger.info(f"🔧 Terminal enforcement: Added {response_builder} to ensure LLM response") logger.info(f"📋 Final validated queue: {validated_queue}") return validated_queue else: logger.warning( "⚠️ No response builder found - workflow may not provide comprehensive response" ) return queue def _is_response_builder(self, agent_id: str) -> bool: """Check if an agent is a response builder.""" if agent_id not in self.agents: return False agent = self.agents[agent_id] agent_type = getattr(agent, "type", "").lower() # Response builder identification criteria return ( any( term in agent_type for term in ["localllm", "local_llm", "answer", "response", "builder"] ) and "classification" not in agent_type ) def _apply_memory_routing_logic(self, shortlist: List[Dict[str, Any]]) -> List[str]: """ Apply intelligent memory agent routing logic. Memory agents have special positioning rules: - Memory readers (read operation) should be at the beginning of the path - Memory writers (write operation) should be at the end of the path - Other agents maintain their relative order Args: shortlist: List of candidate agents from GraphScout Returns: Intelligently ordered list of agent IDs """ try: # Separate agents by type memory_readers = [] memory_writers = [] regular_agents = [] response_builder_found = False for candidate in shortlist: agent_id = candidate.get("node_id") if not agent_id: continue # Check if this is a memory agent if self._is_memory_agent(agent_id): operation = self._get_memory_operation(agent_id) if operation == "read": memory_readers.append(agent_id) logger.info( f"Memory reader agent detected: {agent_id} - positioning at beginning" ) elif operation == "write": memory_writers.append(agent_id) logger.info( f"Memory writer agent detected: {agent_id} - positioning at end" ) else: # Unknown memory operation, treat as regular agent regular_agents.append(agent_id) elif self._is_response_builder(agent_id): regular_agents.append(agent_id) response_builder_found = True else: regular_agents.append(agent_id) # Build the intelligent sequence: readers → regular agents → writers → response_builder agent_sequence = [] # 1. Memory readers first (for context retrieval) agent_sequence.extend(memory_readers) # 2. Regular processing agents (excluding response builders for now) non_response_regular = [ agent for agent in regular_agents if not self._is_response_builder(agent) ] agent_sequence.extend(non_response_regular) # 3. Memory writers (to store processed information) agent_sequence.extend(memory_writers) # 4. Response builder last (if not already present, add one) if not response_builder_found: response_builder = self._get_best_response_builder() if response_builder and response_builder not in agent_sequence: agent_sequence.append(response_builder) else: # Add existing response builders at the end response_builders = [ agent for agent in regular_agents if self._is_response_builder(agent) ] agent_sequence.extend(response_builders) logger.info( f"Memory routing applied: readers={memory_readers}, writers={memory_writers}, regular={len(non_response_regular)}" ) return agent_sequence except Exception as e: logger.error(f"Failed to apply memory routing logic: {e}") # Fallback to original order return [ str(candidate.get("node_id")) for candidate in shortlist if candidate.get("node_id") is not None ] def _is_memory_agent(self, agent_id: str) -> bool: """Check if an agent is a memory agent (reader or writer).""" try: if hasattr(self, "orchestrator") and hasattr(self.orchestrator, "agents"): agent = self.orchestrator.agents.get(agent_id) if agent: agent_class_name = agent.__class__.__name__ return agent_class_name in ["MemoryReaderNode", "MemoryWriterNode"] return False except Exception as e: logger.error(f"Failed to check if {agent_id} is memory agent: {e}") return False def _get_memory_operation(self, agent_id: str) -> str: """Get the operation type (read/write) for a memory agent.""" try: if hasattr(self, "orchestrator") and hasattr(self.orchestrator, "agents"): agent = self.orchestrator.agents.get(agent_id) if agent: agent_class_name = agent.__class__.__name__ if agent_class_name == "MemoryReaderNode": return "read" elif agent_class_name == "MemoryWriterNode": return "write" return "unknown" except Exception as e: logger.error(f"Failed to get memory operation for {agent_id}: {e}") return "unknown" def _get_best_response_builder(self) -> str | None: """Get the best available response builder from the orchestrator configuration.""" original_agents = self.orchestrator_cfg.get("agents", []) response_builders = [] for agent_id in original_agents: if self._is_response_builder(agent_id): response_builders.append(agent_id) if not response_builders: return None # Priority order: response_builder > local_llm > others for builder in response_builders: if "response_builder" in builder.lower(): return str(builder) # Return first available response builder return str(response_builders[0])