Source code for orka.orchestrator.graph_introspection

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning

"""
Graph Introspection Module
=========================

Discovers and analyzes available paths in the workflow graph.
This module implements intelligent path discovery with cycle detection,
constraint checking, and bounded exploration.
"""

import logging
from typing import Any, Dict, List, Optional, Set, Tuple

from .graph_api import EdgeDescriptor, GraphState, NodeDescriptor

logger = logging.getLogger(__name__)


[docs] class GraphIntrospector: """ Intelligent graph exploration and path discovery. Discovers available paths from the current position with: - Bounded depth exploration - Cycle detection and prevention - Constraint-based filtering - Join feasibility analysis """
[docs] def __init__(self, config: Any): """Initialize graph introspector with configuration.""" self.config = config self.max_depth = getattr(config, "max_depth", 4) # Default to 4 if not specified self.k_beam = config.k_beam logger.debug(f"GraphIntrospector initialized with max_depth={self.max_depth}")
def _filter_memory_agents_from_candidates( self, neighbors: List[str], graph_state: GraphState ) -> List[str]: """ Filter out memory agents from regular candidate discovery. Memory agents will be handled specially in the execution engine. Args: neighbors: List of neighbor node IDs graph_state: Current graph state with orchestrator info Returns: Filtered list of neighbors excluding memory agents """ try: filtered_neighbors = [] for neighbor_id in neighbors: if not self._is_memory_agent(neighbor_id, graph_state): filtered_neighbors.append(neighbor_id) else: logger.debug(f"Filtering memory agent {neighbor_id} from regular candidates") memory_count = len(neighbors) - len(filtered_neighbors) logger.info(f"Filtered {memory_count} memory agents from {len(neighbors)} neighbors") # Debug: Log node types for all neighbors for neighbor_id in neighbors: node_desc = graph_state.nodes.get(neighbor_id) if node_desc: logger.debug( f"Neighbor {neighbor_id}: type='{node_desc.type}', is_memory={node_desc.type in ['MemoryReaderNode', 'MemoryWriterNode']}" ) else: logger.debug(f"Neighbor {neighbor_id}: no node descriptor found") return filtered_neighbors except Exception as e: logger.error(f"Failed to filter memory agents: {e}") return neighbors # Fallback to original list def _add_memory_agents_for_shortlist( self, neighbors: List[str], graph_state: GraphState ) -> List[Dict[str, Any]]: """ Add memory agents back as candidates for shortlist decisions. They will be specially positioned by the execution engine. Args: neighbors: List of neighbor node IDs graph_state: Current graph state with orchestrator info Returns: List of memory agent candidates """ try: memory_candidates = [] for neighbor_id in neighbors: if self._is_memory_agent(neighbor_id, graph_state): operation = self._get_memory_operation(neighbor_id, graph_state) candidate = { "node_id": neighbor_id, "path": [neighbor_id], "depth": 1, "feasible": True, "constraints_met": True, "memory_operation": operation, # Add operation metadata "special_routing": True, # Mark for special handling } memory_candidates.append(candidate) logger.debug( f"Added memory agent {neighbor_id} ({operation}) for special routing" ) if memory_candidates: logger.info(f"Added {len(memory_candidates)} memory agents for intelligent routing") return memory_candidates except Exception as e: logger.error(f"Failed to add memory agents for shortlist: {e}") return [] def _is_memory_agent(self, agent_id: str, graph_state: GraphState) -> bool: """Check if an agent is a memory agent (reader or writer).""" try: # Get the node descriptor from graph state node_desc = graph_state.nodes.get(agent_id) if node_desc: # Check the node type for memory agents node_type = node_desc.type return node_type in ["MemoryReaderNode", "MemoryWriterNode"] return False except Exception as e: logger.error(f"Failed to check if {agent_id} is memory agent: {e}") return False def _get_memory_operation(self, agent_id: str, graph_state: GraphState) -> str: """Get the operation type (read/write) for a memory agent.""" try: # Get the node descriptor from graph state node_desc = graph_state.nodes.get(agent_id) if node_desc: node_type = node_desc.type if node_type == "MemoryReaderNode": return "read" elif node_type == "MemoryWriterNode": return "write" return "unknown" except Exception as e: logger.error(f"Failed to get memory operation for {agent_id}: {e}") return "unknown"
[docs] async def discover_paths( self, graph_state: GraphState, question: str, context: Dict[str, Any], executing_node: Optional[str] = None, ) -> List[Dict[str, Any]]: """ Discover candidate paths from current position. Args: graph_state: Complete graph state question: The question/query to route context: Execution context Returns: List of candidate path dictionaries """ try: # Use executing_node if provided, otherwise fall back to graph_state.current_node current_node = executing_node if executing_node else graph_state.current_node visited = graph_state.visited_nodes.copy() logger.info(f"Discovering paths from node: {current_node}") # Get immediate neighbors neighbors = self._get_eligible_neighbors(graph_state, current_node, visited) if not neighbors: logger.warning(f"No eligible neighbors found for node: {current_node}") return [] # Explore paths with bounded search candidates = [] # Depth 1: Direct neighbors (single-hop paths) # Filter out memory agents from regular candidate discovery filtered_neighbors = self._filter_memory_agents_from_candidates(neighbors, graph_state) for neighbor_id in filtered_neighbors: candidate = { "node_id": neighbor_id, "path": [neighbor_id], # GraphScout direct routing - single node path "depth": 1, "feasible": True, "constraints_met": True, } candidates.append(candidate) # Add memory agents back with special handling for shortlist decision memory_candidates = self._add_memory_agents_for_shortlist(neighbors, graph_state) candidates.extend(memory_candidates) # Depth 2+: Extended paths if configured (multi-hop paths) if self.max_depth > 1: extended_candidates = await self._explore_extended_paths( graph_state, neighbors, visited, question, context ) # Filter out any single-hop duplicates from extended exploration unique_extended = [] for ext_candidate in extended_candidates: if ext_candidate.get("depth", 1) > 1: # Only keep multi-hop paths unique_extended.append(ext_candidate) candidates.extend(unique_extended) logger.info( f"Extended path exploration: found {len(unique_extended)} additional multi-hop candidates" ) # Filter and rank candidates filtered_candidates = self._filter_candidates(candidates, graph_state, context) # Don't limit to beam width here - let scoring system handle prioritization # This allows all candidates (single-hop + multi-hop) to compete fairly logger.info(f"Discovered {len(filtered_candidates)} candidate paths") return filtered_candidates except Exception as e: logger.error(f"Path discovery failed: {e}") return []
def _get_eligible_neighbors( self, graph_state: GraphState, current_node: str, visited: Set[str] ) -> List[str]: """Get eligible neighbor nodes from current position.""" neighbors = [] try: logger.debug( f"Looking for neighbors of '{current_node}' in {len(graph_state.edges)} edges" ) logger.debug(f"Available nodes: {list(graph_state.nodes.keys())}") logger.debug(f"Edges: {[(e.src, e.dst) for e in graph_state.edges]}") # SPECIAL CASE: If current node is GraphScout, it can route to ANY available agent # This provides universal compatibility across ALL orchestrator types (sequential, dynamic, fork/join, etc.) if current_node in graph_state.nodes: current_node_obj = graph_state.nodes[current_node] current_is_graphscout = ( hasattr(current_node_obj, "type") and "graphscout" in current_node_obj.type.lower() ) else: current_is_graphscout = False if current_is_graphscout: logger.info( f"GraphScout detected - enabling universal agent visibility across all orchestrator types" ) # GraphScout can route to ANY agent in the workflow (except itself) # This works for ALL orchestrator strategies: sequential, dynamic, fork/join, parallel, etc. available_agents = [] for node_id, node_obj in graph_state.nodes.items(): # Skip self-routing to prevent infinite loops if node_id == current_node: logger.debug(f"Skipping self-routing to {node_id}") continue # Skip already visited nodes to prevent cycles if node_id in visited: logger.debug(f"Skipping visited node: {node_id}") continue # Skip other GraphScout agents to prevent routing loops target_is_graphscout = ( hasattr(node_obj, "type") and "graphscout" in node_obj.type.lower() ) if target_is_graphscout: logger.debug(f"Skipping other GraphScout agent: {node_id}") continue available_agents.append(node_id) logger.debug( f"GraphScout can route to: {node_id} (type: {getattr(node_obj, 'type', 'unknown')})" ) logger.info( f"GraphScout universal routing: found {len(available_agents)} available agents: {available_agents}" ) logger.info( f"GraphScout supports ALL orchestrator types: sequential, dynamic, fork/join, parallel" ) return available_agents # NORMAL CASE: Follow sequential edges for non-GraphScout agents # Find outgoing edges from current node for edge in graph_state.edges: logger.debug(f"Checking edge: {edge.src} -> {edge.dst}") if edge.src == current_node: target_node = edge.dst logger.debug(f"Found outgoing edge to: {target_node}") # Skip already visited nodes (cycle prevention) if target_node in visited: logger.debug(f"Skipping visited node: {target_node}") continue # Check if target node exists if target_node not in graph_state.nodes: logger.warning(f"Target node {target_node} not found in graph") continue # Skip GraphScout agents only if we're currently IN a GraphScout agent # to prevent infinite loops (GraphScout routing to itself) if target_node in graph_state.nodes and current_node in graph_state.nodes: current_node_obj = graph_state.nodes[current_node] target_node_obj = graph_state.nodes[target_node] # Only skip if current node is GraphScout and target is also GraphScout current_is_graphscout = ( hasattr(current_node_obj, "type") and "graphscout" in current_node_obj.type.lower() ) target_is_graphscout = ( hasattr(target_node_obj, "type") and "graphscout" in target_node_obj.type.lower() ) logger.info(f"curr{current_is_graphscout}, tar:{target_is_graphscout}") if current_is_graphscout and target_is_graphscout: logger.debug( f"Skipping GraphScout->GraphScout routing: {current_node} -> {target_node}" ) continue # Check edge conditions if self._check_edge_condition(edge, graph_state): neighbors.append(target_node) logger.debug(f"Added eligible neighbor: {target_node}") logger.debug(f"Found {len(neighbors)} eligible neighbors: {neighbors}") return neighbors except Exception as e: logger.error(f"Failed to get eligible neighbors: {e}") return [] def _check_edge_condition(self, edge: EdgeDescriptor, graph_state: GraphState) -> bool: """Check if edge condition is satisfied.""" try: # If no condition, edge is always traversable if not edge.condition: return True # TODO: Implement condition evaluation # This would evaluate conditions like: # - Previous agent outputs # - Runtime state # - Budget constraints return True except Exception as e: logger.error(f"Failed to check edge condition: {e}") return False async def _explore_extended_paths( self, graph_state: GraphState, start_nodes: List[str], visited: Set[str], question: str, context: Dict[str, Any], ) -> List[Dict[str, Any]]: """Explore multi-step paths from starting nodes.""" extended_candidates = [] try: # Check if this is GraphScout-initiated path discovery initiating_node = context.get("executing_node", graph_state.current_node) is_graphscout_discovery = self._is_graphscout_node(graph_state, initiating_node) for start_node in start_nodes: # Explore paths starting from this node paths = await self._explore_from_node( graph_state, start_node, visited | {start_node}, [start_node], 1, is_graphscout_discovery, ) for path in paths: logger.debug(f"Processing path from _explore_from_node: {' → '.join(path)}") # Ensure all paths end with a response builder terminal_path = await self._ensure_terminal_path(graph_state, path, visited) logger.debug(f"After _ensure_terminal_path: {' → '.join(terminal_path)}") candidate = { "node_id": terminal_path[ 0 ], # First node in path (starting point for routing) "path": terminal_path, "depth": len(terminal_path), "feasible": True, "constraints_met": True, } extended_candidates.append(candidate) if len(terminal_path) > 1: logger.info( f"Multi-hop candidate added: {' → '.join(terminal_path)} (depth: {len(terminal_path)})" ) if len(terminal_path) > len(path): logger.info( f"GraphScout: Enhanced path to ensure terminal: {' → '.join(path)}{' → '.join(terminal_path)}" ) return extended_candidates except Exception as e: logger.error(f"Extended path exploration failed: {e}") return [] async def _explore_from_node( self, graph_state: GraphState, current_node: str, visited: Set[str], current_path: List[str], depth: int, is_graphscout_discovery: bool = False, ) -> List[List[str]]: """Recursively explore paths from a node.""" paths = [] try: # Stop if max depth reached if depth >= self.max_depth: return [current_path] # Get neighbors of current node # For GraphScout, use universal routing even for extended paths if self._is_graphscout_node(graph_state, current_node): neighbors = self._get_eligible_neighbors(graph_state, current_node, visited) else: neighbors = self._get_graph_neighbors(graph_state, current_node, visited) if not neighbors: # Dead end - return current path return [current_path] # Explore each neighbor for neighbor in neighbors: new_path = current_path + [neighbor] new_visited = visited | {neighbor} # Always add the current path as a valid multi-hop path if len(new_path) > 1: # Only multi-hop paths paths.append(new_path) logger.info( f"GraphScout: Created {len(new_path)}-hop path: {' → '.join(new_path)}" ) # For GraphScout-initiated discovery, stop at response builders to avoid infinite exploration if is_graphscout_discovery and self._is_response_builder_node( graph_state, neighbor ): logger.debug(f"Stopping exploration at response builder: {neighbor}") continue # Don't explore further from response builders # Recursively explore from neighbor if we haven't reached max depth if depth < self.max_depth: sub_paths = await self._explore_from_node( graph_state, neighbor, new_visited, new_path, depth + 1, is_graphscout_discovery, ) paths.extend(sub_paths) return paths except Exception as e: logger.error(f"Node exploration failed: {e}") return [current_path] def _get_graph_neighbors( self, graph_state: GraphState, current_node: str, visited: Set[str] ) -> List[str]: """Get neighbors following actual graph edges (not GraphScout universal routing).""" neighbors = [] try: # Find outgoing edges from current node for edge in graph_state.edges: if edge.src == current_node: target_node = edge.dst # Skip already visited nodes (cycle prevention) if target_node in visited: continue # Check if target node exists if target_node not in graph_state.nodes: continue # Skip GraphScout agents to prevent routing loops if target_node in graph_state.nodes: target_node_obj = graph_state.nodes[target_node] target_is_graphscout = ( hasattr(target_node_obj, "type") and "graphscout" in target_node_obj.type.lower() ) if target_is_graphscout: continue # Check edge conditions if self._check_edge_condition(edge, graph_state): neighbors.append(target_node) return neighbors except Exception as e: logger.error(f"Failed to get graph neighbors: {e}") return [] def _is_graphscout_node(self, graph_state: GraphState, node_id: str) -> bool: """Check if a node is a GraphScout agent.""" try: if node_id in graph_state.nodes: node_obj = graph_state.nodes[node_id] return hasattr(node_obj, "type") and "graphscout" in node_obj.type.lower() except Exception: pass return False def _is_response_builder_node(self, graph_state: GraphState, node_id: str) -> bool: """Check if a node is a response builder using capabilities and type.""" try: if node_id in graph_state.nodes: node_obj = graph_state.nodes[node_id] # Check capabilities first (most reliable) if hasattr(node_obj, "capabilities"): capabilities = getattr(node_obj, "capabilities", []) if "answer_emit" in capabilities or "response_generation" in capabilities: return True # Fallback to type-based detection if hasattr(node_obj, "type"): agent_type = node_obj.type.lower() if ( any( term in agent_type for term in [ "localllm", "local_llm", "answer", "response", "builder", ] ) and "classification" not in agent_type ): return True # Name-based fallback return ( "response_builder" in node_id.lower() or "answer" in node_id.lower() or "final" in node_id.lower() ) except Exception: pass return False async def _ensure_terminal_path( self, graph_state: GraphState, path: List[str], visited: Set[str] ) -> List[str]: """ Ensure a path ends with a response builder. If it doesn't, append the best available response builder. Args: graph_state: Current graph state path: Original path visited: Set of visited nodes Returns: Path guaranteed to end with a response builder """ try: # Check if path already ends with a response builder if path and self._is_response_builder_node(graph_state, path[-1]): return path # Find the best response builder to append response_builders = [] path_set = set(path) # Convert to set for faster lookup for node_id in graph_state.nodes: if ( self._is_response_builder_node(graph_state, node_id) and node_id not in path_set # Avoid cycles and node_id not in visited # Respect visited constraints ): response_builders.append(node_id) if not response_builders: # No available response builders, return original path logger.warning( f"No available response builders to append to path: {' → '.join(path)}" ) return path # Choose the best response builder (prefer "response_builder" in name) best_builder = None for builder in response_builders: if "response_builder" in builder.lower(): best_builder = builder break if not best_builder: best_builder = response_builders[0] # Take first available # Append the response builder to create terminal path terminal_path = path + [best_builder] logger.debug(f"Enhanced path with terminal agent: {' → '.join(path)} + {best_builder}") return terminal_path except Exception as e: logger.error(f"Failed to ensure terminal path: {e}") return path # Return original path on error def _filter_candidates( self, candidates: List[Dict[str, Any]], graph_state: GraphState, context: Dict[str, Any] ) -> List[Dict[str, Any]]: """Filter candidates based on constraints and feasibility.""" filtered = [] try: for candidate in candidates: # Check basic feasibility if not self._check_path_feasibility(candidate, graph_state): continue # Check join requirements if not self._check_join_feasibility(candidate, graph_state): continue # Check resource constraints if not self._check_resource_constraints(candidate, graph_state): continue filtered.append(candidate) # Don't sort by path length here - let the scoring system handle prioritization # This allows multi-hop paths to compete fairly with single-hop paths logger.debug(f"Filtered candidates: {len(filtered)} total") return filtered except Exception as e: logger.error(f"Candidate filtering failed: {e}") return candidates def _check_path_feasibility(self, candidate: Dict[str, Any], graph_state: GraphState) -> bool: """Check if path is feasible to execute.""" try: path = candidate["path"] # Check all nodes in path exist for node_id in path: if node_id not in graph_state.nodes: logger.debug(f"Path infeasible: node {node_id} not found") return False # SPECIAL CASE: For GraphScout direct routing (depth=1), skip edge connectivity checks # GraphScout can route directly to any available agent regardless of edges if candidate.get("depth", 1) == 1 and len(path) == 1: logger.debug(f"GraphScout direct routing to {path[0]} - skipping edge checks") return True # Check path connectivity for multi-step paths for i in range(len(path) - 1): src = path[i] dst = path[i + 1] # Find edge between nodes edge_found = False for edge in graph_state.edges: if edge.src == src and edge.dst == dst: edge_found = True break if not edge_found: logger.debug(f"Path infeasible: no edge from {src} to {dst}") return False return True except Exception as e: logger.error(f"Path feasibility check failed: {e}") return False def _check_join_feasibility(self, candidate: Dict[str, Any], graph_state: GraphState) -> bool: """Check if path leads to satisfiable joins.""" try: # TODO: Implement join feasibility analysis # This would check if downstream joins can be satisfied # given the current branch and available parallel paths return True except Exception as e: logger.error(f"Join feasibility check failed: {e}") return False def _check_resource_constraints( self, candidate: Dict[str, Any], graph_state: GraphState ) -> bool: """Check if path meets resource constraints.""" try: path = candidate["path"] budgets = graph_state.budgets # Estimate path cost estimated_cost = 0.0 estimated_latency = 0.0 for node_id in path: if node_id in graph_state.nodes: node = graph_state.nodes[node_id] cost_model = node.cost_model estimated_cost += cost_model.get("base_cost", 0.001) estimated_latency += cost_model.get("latency_estimate_ms", 1000) # Check against budgets max_cost = budgets.get("max_cost_usd", 1.0) max_latency = budgets.get("max_latency_ms", 30000) if estimated_cost > max_cost: logger.debug(f"Path exceeds cost budget: {estimated_cost} > {max_cost}") return False if estimated_latency > max_latency: logger.debug(f"Path exceeds latency budget: {estimated_latency} > {max_latency}") return False # Store estimates in candidate candidate["estimated_cost"] = estimated_cost candidate["estimated_latency"] = estimated_latency return True except Exception as e: logger.error(f"Resource constraint check failed: {e}") return True # Default to allowing if check fails