Source code for orka.orchestrator.path_scoring

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning

"""
Path Scoring System
==================

Multi-criteria scoring system for evaluating candidate paths.
Combines LLM evaluation, heuristics, historical priors, and budget considerations.
"""

import asyncio
import json
import logging
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)


[docs] class PathScorer: """ Multi-criteria path scoring system. Evaluates candidate paths using: - LLM relevance assessment - Heuristic matching (capabilities, constraints) - Historical success priors - Cost and latency penalties - Safety risk assessment """
[docs] def __init__(self, config: Any): """Initialize path scorer with configuration.""" self.config = config self.score_weights = config.score_weights # Initialize LLM evaluator (placeholder for now) self.llm_evaluator = None logger.debug(f"PathScorer initialized with weights: {self.score_weights}")
[docs] async def score_candidates( self, candidates: List[Dict[str, Any]], question: str, context: Dict[str, Any] ) -> List[Dict[str, Any]]: """ Score all candidates using multi-criteria evaluation. Args: candidates: List of candidate paths to score question: The question/query being routed context: Execution context Returns: List of candidates with scores and components """ try: scored_candidates = [] # Score each candidate for candidate in candidates: score_components = await self._score_candidate(candidate, question, context) # Calculate final weighted score final_score = self._calculate_final_score(score_components) # Add scoring information to candidate candidate["score"] = final_score candidate["score_components"] = score_components candidate["confidence"] = self._calculate_confidence(score_components) scored_candidates.append(candidate) # Sort by score (descending) scored_candidates.sort(key=lambda x: x["score"], reverse=True) # Apply beam width limiting after scoring to keep only top candidates k_beam = getattr(self.config, "k_beam", 3) final_candidates = scored_candidates[:k_beam] logger.info( f"Scored {len(scored_candidates)} candidates, " f"top score: {scored_candidates[0]['score']:.3f}, " f"keeping top {len(final_candidates)} (k_beam={k_beam})" ) return final_candidates except Exception as e: logger.error(f"Candidate scoring failed: {e}") return candidates
async def _score_candidate( self, candidate: Dict[str, Any], question: str, context: Dict[str, Any] ) -> Dict[str, float]: """Score a single candidate across all criteria.""" try: components = {} # DEBUG: Log path information for debugging path = candidate.get("path", [candidate.get("node_id", "")]) is_multi_hop = len(path) > 1 if is_multi_hop: logger.info(f"🔍 SCORING multi-hop path: {' → '.join(path)} (depth: {len(path)})") else: logger.info(f"🔍 SCORING single-hop path: {path[0] if path else 'unknown'}") # Normal scoring for all paths components["llm"] = await self._score_llm_relevance(candidate, question, context) components["heuristics"] = await self._score_heuristics(candidate, question, context) components["prior"] = await self._score_priors(candidate, question, context) components["cost"] = await self._score_cost(candidate, context) components["latency"] = await self._score_latency(candidate, context) return components except Exception as e: logger.error(f"Individual candidate scoring failed: {e}") return {"llm": 0.0, "heuristics": 0.0, "prior": 0.0, "cost": 0.0, "latency": 0.0} async def _score_llm_relevance( self, candidate: Dict[str, Any], question: str, context: Dict[str, Any] ) -> float: """Score candidate relevance using LLM evaluation results.""" try: # Use LLM evaluation results from SmartPathEvaluator llm_eval = candidate.get("llm_evaluation", {}) if llm_eval: # Use the final relevance score from two-stage LLM evaluation final_scores = llm_eval.get("final_scores", {}) relevance_score = final_scores.get("relevance", 0.5) logger.debug( f"Using LLM relevance score: {relevance_score} for {candidate['node_id']}" ) return float(relevance_score) # Fallback to heuristic if no LLM evaluation available node_id = candidate["node_id"] path = candidate["path"] # Simple keyword matching as fallback question_lower = question.lower() relevance_score = 0.5 # Default neutral score # Boost score for certain node types based on question content if "search" in question_lower and "search" in node_id.lower(): relevance_score += 0.3 elif "memory" in question_lower and "memory" in node_id.lower(): relevance_score += 0.3 elif "analyze" in question_lower and "llm" in node_id.lower(): relevance_score += 0.3 # Penalize very long paths if len(path) > 3: relevance_score -= 0.1 return min(1.0, max(0.0, relevance_score)) except Exception as e: logger.error(f"LLM relevance scoring failed: {e}") return 0.5 async def _score_heuristics( self, candidate: Dict[str, Any], question: str, context: Dict[str, Any] ) -> float: """Score candidate using rule-based heuristics.""" try: score = 0.0 # Input readiness check score += self._check_input_readiness(candidate, context) * 0.3 # Modality fit check score += self._check_modality_fit(candidate, question) * 0.3 # Domain overlap check score += self._check_domain_overlap(candidate, question) * 0.2 # Safety fit check score += self._check_safety_fit(candidate, context) * 0.2 return min(1.0, max(0.0, score)) except Exception as e: logger.error(f"Heuristic scoring failed: {e}") return 0.5 async def _score_priors( self, candidate: Dict[str, Any], question: str, context: Dict[str, Any] ) -> float: """Score candidate based on historical success.""" try: # TODO: Implement actual prior lookup from memory # For now, return neutral score node_id = candidate["node_id"] # Simple heuristic: prefer shorter paths initially path_length = len(candidate["path"]) if path_length == 1: return 0.7 # Prefer direct paths elif path_length == 2: return 0.5 # Neutral for 2-step paths else: return 0.3 # Penalize longer paths except Exception as e: logger.error(f"Prior scoring failed: {e}") return 0.5 async def _score_cost(self, candidate: Dict[str, Any], context: Dict[str, Any]) -> float: """Score candidate based on cost efficiency.""" try: estimated_cost = candidate.get("estimated_cost", 0.001) # Normalize cost to 0-1 scale (inverted - lower cost is better) max_reasonable_cost = 0.1 # $0.10 as reasonable maximum normalized_cost = min(1.0, estimated_cost / max_reasonable_cost) # Return inverted score (1.0 for low cost, 0.0 for high cost) return float(1.0 - normalized_cost) except Exception as e: logger.error(f"Cost scoring failed: {e}") return 0.5 async def _score_latency(self, candidate: Dict[str, Any], context: Dict[str, Any]) -> float: """Score candidate based on latency efficiency.""" try: estimated_latency = candidate.get("estimated_latency", 1000) # Normalize latency to 0-1 scale (inverted - lower latency is better) max_reasonable_latency = 10000 # 10 seconds as reasonable maximum normalized_latency = min(1.0, estimated_latency / max_reasonable_latency) # Return inverted score (1.0 for low latency, 0.0 for high latency) return float(1.0 - normalized_latency) except Exception as e: logger.error(f"Latency scoring failed: {e}") return 0.5 def _calculate_final_score(self, components: Dict[str, float]) -> float: """Calculate weighted final score from components.""" try: final_score = 0.0 for component, score in components.items(): weight = self.score_weights.get(component, 0.0) final_score += weight * score return min(1.0, max(0.0, final_score)) except Exception as e: logger.error(f"Final score calculation failed: {e}") return 0.0 def _calculate_confidence(self, components: Dict[str, float]) -> float: """Calculate confidence based on score consistency.""" try: scores = list(components.values()) if not scores: return 0.0 # High confidence when scores are consistently high avg_score = sum(scores) / len(scores) # Calculate variance to penalize inconsistent scores variance = sum((s - avg_score) ** 2 for s in scores) / len(scores) consistency_penalty = min(0.3, variance) confidence = avg_score - consistency_penalty return min(1.0, max(0.0, confidence)) except Exception as e: logger.error(f"Confidence calculation failed: {e}") return 0.0 def _check_input_readiness(self, candidate: Dict[str, Any], context: Dict[str, Any]) -> float: """Check if required inputs are available.""" try: # TODO: Implement actual input requirement checking # For now, assume inputs are generally available return 0.8 except Exception: return 0.5 def _check_modality_fit(self, candidate: Dict[str, Any], question: str) -> float: """Check if candidate matches question modality.""" try: node_id = candidate["node_id"].lower() question_lower = question.lower() # Simple modality matching if any(word in question_lower for word in ["image", "picture", "visual"]): if "vision" in node_id or "image" in node_id: return 1.0 else: return 0.3 # Text processing is default return 0.7 except Exception: return 0.5 def _check_domain_overlap(self, candidate: Dict[str, Any], question: str) -> float: """Check domain overlap between candidate and question.""" try: # TODO: Implement semantic similarity checking # For now, use simple keyword overlap node_id = candidate["node_id"].lower() question_words = set(question.lower().split()) node_words = set(node_id.split("_")) overlap = len(question_words & node_words) max_possible = min(len(question_words), len(node_words)) if max_possible == 0: return 0.5 return overlap / max_possible except Exception: return 0.5 def _check_safety_fit(self, candidate: Dict[str, Any], context: Dict[str, Any]) -> float: """Check if candidate meets safety requirements.""" try: # TODO: Implement actual safety checking # For now, assume most paths are safe return 0.9 except Exception: return 0.5