Source code for orka.orchestrator.decision_engine

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning

"""
Decision Engine
==============

Final decision-making logic for path selection.
Implements commit margin analysis, confidence thresholds, and decision types.
"""

import logging
from typing import Any, Dict, List

logger = logging.getLogger(__name__)


[docs] class DecisionEngine: """ Final decision-making engine for path selection. Analyzes scored candidates and makes final routing decisions based on: - Score margins between top candidates - Confidence thresholds - Budget constraints - Safety requirements """
[docs] def __init__(self, config: Any): """Initialize decision engine with configuration.""" self.config = config self.commit_margin = config.commit_margin self.k_beam = config.k_beam self.require_terminal = getattr(config, "require_terminal", True) logger.debug( f"DecisionEngine initialized with commit_margin={self.commit_margin}, require_terminal={self.require_terminal}" )
[docs] async def make_decision( self, scored_candidates: List[Dict[str, Any]], context: Dict[str, Any] ) -> Dict[str, Any]: """ Make final routing decision based on scored candidates. Args: scored_candidates: List of candidates with scores context: Execution context Returns: Decision object with type, target, confidence, and reasoning """ try: if not scored_candidates: return self._create_decision("fallback", None, 0.0, "No candidates available") # Sort by score (should already be sorted) scored_candidates.sort(key=lambda x: x.get("score", 0.0), reverse=True) # Check for terminal paths (2-hop chains ending with response builders) if self.require_terminal: terminal_paths = self._find_terminal_paths(scored_candidates, context) if terminal_paths: best_terminal = terminal_paths[0] terminal_path = best_terminal["path"] logger.info(f"🎯 Creating commit_path decision with target: {terminal_path}") return self._create_decision( "commit_path", terminal_path, best_terminal.get("confidence", 0.8), f"🎯 Terminal path to response builder (score={best_terminal.get('score', 0.0):.3f})", ) top_candidate = scored_candidates[0] top_score = top_candidate.get("score", 0.0) # Dynamic commit margin based on query type dynamic_margin = self._get_dynamic_margin(context) logger.debug(f"Using dynamic margin: {dynamic_margin} (base: {self.commit_margin})") # Single candidate case if len(scored_candidates) == 1: return await self._handle_single_candidate(top_candidate, context) # Multiple candidates case second_score = scored_candidates[1].get("score", 0.0) score_margin = top_score - second_score logger.debug( f"Decision analysis: top_score={top_score:.3f}, " f"second_score={second_score:.3f}, margin={score_margin:.3f}" ) # High confidence - commit to single path if score_margin >= dynamic_margin: return await self._handle_high_confidence_decision( top_candidate, score_margin, context ) # Low confidence - return shortlist return await self._handle_low_confidence_decision( scored_candidates[: self.k_beam], score_margin, context ) except Exception as e: logger.error(f"Decision making failed: {e}") return self._create_decision("fallback", None, 0.0, f"Decision engine error: {e}")
def _find_terminal_paths( self, scored_candidates: List[Dict[str, Any]], context: Dict[str, Any] ) -> List[Dict[str, Any]]: """Find terminal paths that end with response builders.""" terminal_paths = [] try: logger.info(f"🔍 Searching for terminal paths in {len(scored_candidates)} candidates") # First priority: 2-hop paths ending with response builders for candidate in scored_candidates: path = candidate.get("path", [candidate["node_id"]]) logger.debug(f"🔍 Checking path: {' → '.join(path)} (length: {len(path)})") # Prioritize 2-hop chains if len(path) == 2: last_node = path[-1] is_terminal = self._is_response_builder(last_node, context) logger.info( f"🔍 2-hop path {' → '.join(path)}: last_node='{last_node}' is_terminal={is_terminal}" ) if is_terminal: terminal_paths.append(candidate) logger.info(f"✅ Found 2-hop terminal path: {' → '.join(path)}") # If no 2-hop terminals, consider longer paths if not terminal_paths: logger.info("🔍 No 2-hop terminals found, checking longer paths...") for candidate in scored_candidates: path = candidate.get("path", [candidate["node_id"]]) if len(path) > 2: last_node = path[-1] is_terminal = self._is_response_builder(last_node, context) logger.info( f"🔍 {len(path)}-hop path {' → '.join(path)}: last_node='{last_node}' is_terminal={is_terminal}" ) if is_terminal: terminal_paths.append(candidate) logger.info( f"✅ Found {len(path)}-hop terminal path: {' → '.join(path)}" ) logger.info( f"🔍 Terminal path search complete: found {len(terminal_paths)} terminal paths" ) # Sort terminal paths by score terminal_paths.sort(key=lambda x: x.get("score", 0.0), reverse=True) return terminal_paths except Exception as e: logger.error(f"Terminal path detection failed: {e}") return [] def _get_dynamic_margin(self, context: Dict[str, Any]) -> float: """Get dynamic commit margin based on query type and context.""" try: # Check if we have classification information previous_outputs = context.get("previous_outputs", {}) # Look for input classifier output classifier_output = previous_outputs.get("input_classifier", {}) if isinstance(classifier_output, dict): classification = classifier_output.get("response", "").lower() else: classification = str(classifier_output).lower() # Dynamic margins based on query type if "factual" in classification: return 0.08 # Lower margin for factual queries elif "analytical" in classification: return 0.15 # Medium margin for analytical queries elif "technical" in classification: return 0.12 # Medium-low margin for technical queries elif "creative" in classification: return 0.2 # Higher margin for creative queries (more subjective) # Fallback: use base margin return float(self.commit_margin) except Exception as e: logger.debug(f"Dynamic margin calculation failed: {e}") return float(self.commit_margin) def _is_response_builder(self, node_id: str, context: Dict[str, Any]) -> bool: """Check if a node is a response builder.""" try: logger.debug(f"🔍 Checking if '{node_id}' is a response builder...") # Check capabilities first (most reliable) graph_state = context.get("graph_state") if graph_state and node_id in graph_state.nodes: node_obj = graph_state.nodes[node_id] if hasattr(node_obj, "capabilities"): capabilities = getattr(node_obj, "capabilities", []) logger.debug(f"🔍 Node '{node_id}' capabilities: {capabilities}") if "answer_emit" in capabilities or "response_generation" in capabilities: logger.debug(f"✅ Node '{node_id}' is response builder (capabilities)") return True # Fallback to heuristics based on node name and type patterns response_builder_patterns = [ "response_builder", "answer_builder", "final_response", "llm_response", "openai_response", ] node_id_lower = node_id.lower() logger.debug( f"🔍 Checking node name '{node_id_lower}' against patterns: {response_builder_patterns}" ) for pattern in response_builder_patterns: if pattern in node_id_lower: logger.debug( f"✅ Node '{node_id}' is response builder (name pattern: '{pattern}')" ) return True # Additional check: if the node type contains "localllm" or "openai" # and doesn't contain "classification", it's likely a response builder # This would require access to the orchestrator's node configuration # For now, we'll use the name-based heuristic logger.debug(f"❌ Node '{node_id}' is NOT a response builder") return False except Exception as e: logger.error(f"Response builder check failed: {e}") return False async def _handle_single_candidate( self, candidate: Dict[str, Any], context: Dict[str, Any] ) -> Dict[str, Any]: """Handle case with single candidate.""" try: score = candidate.get("score", 0.0) confidence = candidate.get("confidence", 0.0) # Check if single candidate is good enough if score >= 0.7 and confidence >= 0.6: path = candidate.get("path", [candidate["node_id"]]) if len(path) == 1: return self._create_decision( "commit_next", candidate["node_id"], confidence, f"Single high-quality candidate (score={score:.3f})", ) else: return self._create_decision( "commit_path", path, confidence, f"Single high-quality path (score={score:.3f})", ) else: return self._create_decision( "shortlist", [candidate], confidence, f"Single candidate with moderate quality (score={score:.3f})", ) except Exception as e: logger.error(f"Single candidate handling failed: {e}") return self._create_decision("fallback", None, 0.0, f"Single candidate error: {e}") async def _handle_high_confidence_decision( self, top_candidate: Dict[str, Any], margin: float, context: Dict[str, Any] ) -> Dict[str, Any]: """Handle high confidence decision with clear winner.""" try: path = top_candidate.get("path", [top_candidate["node_id"]]) confidence = top_candidate.get("confidence", 0.0) score = top_candidate.get("score", 0.0) # Boost confidence based on margin adjusted_confidence = min(1.0, confidence + (margin * 0.5)) if len(path) == 1: return self._create_decision( "commit_next", top_candidate["node_id"], adjusted_confidence, f"Clear winner with margin {margin:.3f} (score={score:.3f})", ) else: return self._create_decision( "commit_path", path, adjusted_confidence, f"Clear path winner with margin {margin:.3f} (score={score:.3f})", ) except Exception as e: logger.error(f"High confidence decision failed: {e}") return self._create_decision("fallback", None, 0.0, f"High confidence error: {e}") async def _handle_low_confidence_decision( self, top_candidates: List[Dict[str, Any]], margin: float, context: Dict[str, Any] ) -> Dict[str, Any]: """Handle low confidence decision with multiple viable options.""" try: # Calculate average confidence confidences = [c.get("confidence", 0.0) for c in top_candidates] avg_confidence = sum(confidences) / len(confidences) if confidences else 0.0 # Penalize confidence due to uncertainty adjusted_confidence = max(0.0, avg_confidence - 0.2) return self._create_decision( "shortlist", top_candidates, adjusted_confidence, f"Close competition with margin {margin:.3f} - returning top {len(top_candidates)} options", ) except Exception as e: logger.error(f"Low confidence decision failed: {e}") return self._create_decision("fallback", None, 0.0, f"Low confidence error: {e}") def _create_decision( self, decision_type: str, target: Any, confidence: float, reasoning: str ) -> Dict[str, Any]: """Create a standardized decision object.""" return { "decision_type": decision_type, "target": target, "confidence": confidence, "reasoning": reasoning, "trace": { "engine": "DecisionEngine", "commit_margin": self.commit_margin, "k_beam": self.k_beam, }, }