# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning
from datetime import datetime
from .base_node import BaseNode
[docs]
class FailoverNode(BaseNode):
"""
A node that implements failover logic by trying multiple child nodes in sequence.
If one child fails, it tries the next one until one succeeds or all fail.
"""
[docs]
def __init__(self, node_id, children=None, queue=None, prompt=None, **kwargs):
"""
Initialize the failover node.
Args:
node_id (str): Unique identifier for the node.
children (list): List of child nodes to try in sequence.
queue (list): Queue of agents or nodes to be processed.
prompt (str): Prompt for the node (optional for failover).
**kwargs: Additional parameters.
"""
# Call parent constructor
super().__init__(node_id, prompt or "", queue or [], **kwargs)
# Set failover-specific attributes
self.children = children or []
self.agent_id = node_id # Ensure agent_id is set for proper identification
[docs]
async def run(self, input_data):
"""
Run the failover logic by trying each child node in sequence.
Args:
input_data: Input data to pass to child nodes.
Returns:
dict: Result from the first successful child node.
Raises:
RuntimeError: If all child nodes fail.
"""
last_error = None
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][INFO] Starting failover with {len(self.children)} children",
)
for i, child in enumerate(self.children):
child_id = getattr(
child,
"agent_id",
getattr(child, "node_id", f"unknown_child_{i}"),
)
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][INFO] Trying child {i + 1}/{len(self.children)}: {child_id}",
)
try:
# Render prompt for child before running (fix for {{ input }} template access)
child_payload = input_data.copy()
if hasattr(child, "prompt") and child.prompt:
try:
from jinja2 import Template
formatted_prompt = Template(child.prompt).render(**input_data)
child_payload["formatted_prompt"] = formatted_prompt
except Exception:
# If rendering fails, use original prompt as fallback
child_payload["formatted_prompt"] = child.prompt
# Try running the current child node
if hasattr(child, "run") and callable(child.run):
# Check if the child's run method is async
import asyncio
if asyncio.iscoroutinefunction(child.run):
result = await child.run(child_payload)
else:
result = child.run(child_payload)
else:
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][ERROR] Child '{child_id}' has no run method",
)
continue
# Check if result is valid (not None, not empty, and contains meaningful data)
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][DEBUG] Child '{child_id}' returned result type: {type(result)}",
)
if result:
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][DEBUG] Result preview: {str(result)[:200]}...",
)
if result and self._is_valid_result(result):
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][SUCCESS] Agent '{child_id}' succeeded",
)
# Return result in a more accessible format
return {
"result": result,
"successful_child": child_id,
child_id: result, # Keep backward compatibility
}
else:
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][INFO] Agent '{child_id}' returned empty/invalid result",
)
except Exception as e:
# Log the failure and continue to next child
last_error = e
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][WARNING] Agent '{child_id}' failed: {e}",
)
# Add delay before trying next child to avoid rate limiting
if "ratelimit" in str(e).lower() or "rate" in str(e).lower():
import asyncio
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][INFO] Rate limit detected, waiting 2 seconds before next attempt",
)
await asyncio.sleep(2)
# If we get here, all children failed
error_msg = (
f"All fallback agents failed. Last error: {last_error}"
if last_error
else "All fallback agents failed."
)
print(f"{datetime.now()} > [ORKA][NODE][FAILOVER][ERROR] {error_msg}")
# Return structured error result instead of raising exception
return {
"result": error_msg,
"status": "failed",
"successful_child": None,
"error": str(last_error) if last_error else "All children failed",
}
def _is_valid_result(self, result):
"""
Check if a result is valid and meaningful.
Args:
result: The result to validate
Returns:
bool: True if result is valid, False otherwise
"""
if not result:
return False
# If result is a dict, check for meaningful content
if isinstance(result, dict):
# Check for common success indicators
if result.get("status") == "error":
return False
# Check for response content
if "response" in result:
response = result["response"]
if not response or response in ["NONE", "", None]:
return False
# Check if response contains HTML tags (likely irrelevant web search results)
if isinstance(response, str) and ("<" in response and ">" in response):
if "tag" in response.lower() or "html" in response.lower():
return False
# Check for result content
if "result" in result:
inner_result = result["result"]
if isinstance(inner_result, dict) and "response" in inner_result:
response = inner_result["response"]
if not response or response in ["NONE", "", None]:
return False
# Check for HTML content in nested response
if isinstance(response, str) and ("<" in response and ">" in response):
if "tag" in response.lower() or "html" in response.lower():
return False
# If result is a list, check if it's not empty and doesn't contain HTML content or error messages
elif isinstance(result, list):
if len(result) == 0:
return False
# Check if list contains error messages or HTML-like content
for item in result:
if isinstance(item, str):
item_lower = item.lower()
# Check for error messages
error_indicators = [
"failed",
"error",
"ratelimit",
"rate limit",
"timeout",
"connection error",
"404",
"500",
"503",
]
if any(indicator in item_lower for indicator in error_indicators):
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][DEBUG] Rejecting error message: {item[:50]}...",
)
return False
# More comprehensive HTML detection
if (
("<" in item and ">" in item)
or "tag" in item_lower
or "html" in item_lower
or "element" in item_lower
):
# Additional checks for common HTML-related terms
html_indicators = [
"input",
"form",
"attribute",
"w3schools",
"css",
"javascript",
"web-based",
]
if any(indicator in item_lower for indicator in html_indicators):
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][DEBUG] Rejecting HTML content: {item[:50]}...",
)
return False
# If result is a string, check if it's meaningful and not HTML or error message
elif isinstance(result, str):
if result in ["NONE", "", None]:
return False
result_lower = result.lower()
# Check for error messages
error_indicators = [
"failed",
"error",
"ratelimit",
"rate limit",
"timeout",
"connection error",
"404",
"500",
"503",
]
if any(indicator in result_lower for indicator in error_indicators):
print(
f"{datetime.now()} > [ORKA][NODE][FAILOVER][DEBUG] Rejecting error message: {result[:50]}...",
)
return False
# Check for HTML content
if "<" in result and ">" in result:
if "tag" in result_lower or "html" in result_lower:
return False
return True