#!/usr/bin/env python3
"""
Error handling wrapper for OrKa Orchestrator.
Provides comprehensive error tracking and telemetry without modifying the core orchestrator logic.
"""
import json
import os
import traceback
from datetime import datetime, timezone
from typing import Dict, List
[docs]
class OrkaErrorHandler:
"""
Comprehensive error handling system for OrKa orchestrator.
Tracks errors, retries, status codes, and provides detailed debugging reports.
"""
def __init__(self, orchestrator):
self.orchestrator = orchestrator
self.error_telemetry = {
"errors": [], # List of all errors encountered
"retry_counters": {}, # Per-agent retry counts
"partial_successes": [], # Agents that succeeded after retries
"silent_degradations": [], # JSON parsing failures that fell back to raw text
"status_codes": {}, # HTTP status codes for API calls
"execution_status": "running", # overall status: running, completed, failed, partial
"critical_failures": [], # Failures that stopped execution
"recovery_actions": [], # Actions taken to recover from errors
}
[docs]
def record_error(
self,
error_type: str,
agent_id: str,
error_msg: str,
exception: Exception = None,
step: int = None,
status_code: int = None,
recovery_action: str = None,
):
"""Record an error in the error telemetry system."""
error_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"type": error_type,
"agent_id": agent_id,
"message": error_msg,
"step": step or getattr(self.orchestrator, "step_index", 0),
"run_id": getattr(self.orchestrator, "run_id", "unknown"),
}
if exception:
error_entry["exception"] = {
"type": str(type(exception).__name__),
"message": str(exception),
"traceback": traceback.format_exc(),
}
if status_code:
error_entry["status_code"] = status_code
self.error_telemetry["status_codes"][agent_id] = status_code
if recovery_action:
error_entry["recovery_action"] = recovery_action
self.error_telemetry["recovery_actions"].append(
{
"timestamp": error_entry["timestamp"],
"agent_id": agent_id,
"action": recovery_action,
},
)
self.error_telemetry["errors"].append(error_entry)
print(f"🚨 [ORKA-ERROR] {error_type} in {agent_id}: {error_msg}")
[docs]
def record_silent_degradation(self, agent_id: str, degradation_type: str, details: str):
"""Record silent degradations like JSON parsing failures."""
self.error_telemetry["silent_degradations"].append(
{
"timestamp": datetime.now(timezone.utc).isoformat(),
"agent_id": agent_id,
"type": degradation_type,
"details": details,
},
)
[docs]
def save_comprehensive_error_report(self, logs: List[Dict], final_error: Exception = None):
"""Save comprehensive error report with all logged data up to the failure point."""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_dir = os.getenv("ORKA_LOG_DIR", "logs")
os.makedirs(log_dir, exist_ok=True)
# Determine final execution status
if final_error:
self.error_telemetry["execution_status"] = "failed"
self.error_telemetry["critical_failures"].append(
{
"timestamp": datetime.now(timezone.utc).isoformat(),
"error": str(final_error),
"step": getattr(self.orchestrator, "step_index", 0),
},
)
elif self.error_telemetry["errors"]:
self.error_telemetry["execution_status"] = "partial"
else:
self.error_telemetry["execution_status"] = "completed"
# Generate meta report even on failure
try:
meta_report = self.orchestrator._generate_meta_report(logs)
except Exception as e:
self.record_error(
"meta_report_generation",
"meta_report",
f"Failed to generate meta report: {e}",
e,
)
meta_report = {
"error": "Failed to generate meta report",
"partial_data": {
"total_agents_executed": len(logs),
"run_id": getattr(self.orchestrator, "run_id", "unknown"),
},
}
# Create comprehensive error report
error_report = {
"orka_execution_report": {
"run_id": getattr(self.orchestrator, "run_id", "unknown"),
"timestamp": timestamp,
"execution_status": self.error_telemetry["execution_status"],
"error_telemetry": self.error_telemetry,
"meta_report": meta_report,
"execution_logs": logs,
"total_steps_attempted": getattr(self.orchestrator, "step_index", 0),
"total_errors": len(self.error_telemetry["errors"]),
"total_retries": sum(self.error_telemetry["retry_counters"].values()),
"agents_with_errors": list(
set(error["agent_id"] for error in self.error_telemetry["errors"]),
),
"memory_snapshot": self._capture_memory_snapshot(),
},
}
# Save error report
error_report_path = os.path.join(log_dir, f"orka_error_report_{timestamp}.json")
try:
with open(error_report_path, "w") as f:
json.dump(error_report, f, indent=2, default=str)
print(f"📋 Comprehensive error report saved: {error_report_path}")
except Exception as e:
print(f"❌ Failed to save error report: {e}")
# Also save execution trace
try:
trace_path = os.path.join(log_dir, f"orka_trace_{timestamp}.json")
self.orchestrator.memory.save_to_file(trace_path)
print(f"📋 Execution trace saved: {trace_path}")
except Exception as e:
print(f"⚠️ Failed to save trace to memory backend: {e}")
return error_report_path
def _capture_memory_snapshot(self):
"""Capture current state of memory backend for debugging."""
try:
if hasattr(self.orchestrator.memory, "memory") and self.orchestrator.memory.memory:
return {
"total_entries": len(self.orchestrator.memory.memory),
"last_10_entries": self.orchestrator.memory.memory[-10:]
if len(self.orchestrator.memory.memory) >= 10
else self.orchestrator.memory.memory,
"backend_type": type(self.orchestrator.memory).__name__,
}
except Exception as e:
return {"error": f"Failed to capture memory snapshot: {e}"}
return {"status": "no_memory_data"}
[docs]
async def run_with_error_handling(self, input_data):
"""
Run the orchestrator with comprehensive error handling.
Always returns a JSON report, even on failure, for debugging purposes.
"""
logs = []
# Store original run method
original_run = self.orchestrator.run
try:
# Monkey patch to capture logs and add error handling to individual agents
self._patch_orchestrator_for_error_tracking()
# Run the orchestrator normally
result = await original_run(input_data)
# Check if any errors occurred during execution
if self.error_telemetry["errors"]:
print(
f"⚠️ [ORKA-WARNING] Execution completed with {len(self.error_telemetry['errors'])} errors",
)
self.error_telemetry["execution_status"] = "partial"
else:
self.error_telemetry["execution_status"] = "completed"
# Enhance the result with error telemetry
if isinstance(result, list):
# Standard successful result - logs list
enhanced_result = {
"status": "success",
"execution_logs": result,
"error_telemetry": self.error_telemetry,
"summary": self._get_execution_summary(result),
}
# Save the report even on success (with all telemetry)
error_report_path = self.save_comprehensive_error_report(result)
enhanced_result["report_path"] = error_report_path
return enhanced_result
else:
# Already an error result from orchestrator
result["error_telemetry"] = self.error_telemetry
return result
except Exception as critical_error:
# Critical failure - save everything we have so far
self.record_error(
"critical_failure",
"orchestrator",
f"Critical orchestrator failure: {critical_error}",
critical_error,
)
print(f"💥 [ORKA-CRITICAL] Orchestrator failed: {critical_error}")
# Try to get partial logs if possible
try:
if hasattr(self.orchestrator, "memory") and hasattr(
self.orchestrator.memory,
"memory",
):
logs = self.orchestrator.memory.memory[-50:] # Get last 50 entries
except Exception:
logs = []
error_report_path = self.save_comprehensive_error_report(logs, critical_error)
# Try to cleanup memory backend
try:
self.orchestrator.memory.close()
except Exception as cleanup_error:
print(f"⚠️ Failed to cleanup memory backend: {cleanup_error}")
# Return error report for debugging instead of raising
return {
"status": "critical_failure",
"error": str(critical_error),
"error_report_path": error_report_path,
"logs_captured": len(logs),
"error_telemetry": self.error_telemetry,
"traceback": traceback.format_exc(),
}
def _patch_orchestrator_for_error_tracking(self):
"""Add error tracking to orchestrator methods without breaking existing logic."""
# This could be expanded to patch individual agent run methods
# For now, we rely on the outer error handling
def _get_execution_summary(self, logs):
"""Get a summary of the execution."""
return {
"total_agents_executed": len(logs),
"total_errors": len(self.error_telemetry["errors"]),
"total_retries": sum(self.error_telemetry["retry_counters"].values()),
"execution_status": self.error_telemetry["execution_status"],
}
# Enhanced orchestrator wrapper function
[docs]
async def run_orchestrator_with_error_handling(orchestrator, input_data):
"""
Enhanced wrapper function to run any orchestrator with comprehensive error handling.
Usage:
from orka.orchestrator_error_wrapper import run_orchestrator_with_error_handling
orchestrator = Orchestrator("config.yml")
result = await run_orchestrator_with_error_handling(orchestrator, input_data)
"""
error_handler = OrkaErrorHandler(orchestrator)
return await error_handler.run_with_error_handling(input_data)