Source code for orka.tui.data_manager

"""
Data management for TUI interface - statistics, caching, and data fetching.
"""

import os
import time
from collections import deque
from typing import Any, Dict

from ..memory_logger import create_memory_logger


[docs] class MemoryStats: """Container for memory statistics with historical tracking.""" def __init__(self, max_history: int = 100): self.max_history = max_history self.history: deque = deque(maxlen=max_history) self.current: Dict[str, Any] = {}
[docs] def update(self, stats: Dict[str, Any]): """Update current stats and add to history.""" self.current = stats.copy() self.current["timestamp"] = time.time() self.history.append(self.current.copy())
[docs] def get_trend(self, key: str, window: int = 10) -> str: """Get trend direction for a metric.""" if len(self.history) < 2: return "→" recent = list(self.history)[-window:] if len(recent) < 2: return "→" values = [item.get(key, 0) for item in recent if key in item] if len(values) < 2: return "→" if values[-1] > values[0]: return "↗" elif values[-1] < values[0]: return "↘" else: return "→"
[docs] def get_rate(self, key: str, window: int = 5) -> float: """Get rate of change for a metric (per second).""" if len(self.history) < 2: return 0.0 recent = list(self.history)[-window:] if len(recent) < 2: return 0.0 # Calculate rate between first and last points first = recent[0] last = recent[-1] if key not in first or key not in last: return 0.0 value_diff = last[key] - first[key] time_diff = last["timestamp"] - first["timestamp"] if time_diff <= 0: return 0.0 return value_diff / time_diff
[docs] class DataManager: """Manages data fetching and caching for the TUI interface.""" def __init__(self): self.memory_logger = None self.backend = None self.stats = MemoryStats() self.memory_data = [] self.performance_history = deque(maxlen=60) # 1 minute at 1s intervals
[docs] def init_memory_logger(self, args): """Initialize the memory logger.""" self.backend = getattr(args, "backend", None) or os.getenv( "ORKA_MEMORY_BACKEND", "redisstack", ) # Provide proper Redis URL based on backend if self.backend == "redisstack": redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0") else: redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0") self.memory_logger = create_memory_logger(backend=self.backend, redis_url=redis_url)
[docs] def update_data(self): """Update all monitoring data.""" try: # Get memory statistics stats = self.memory_logger.get_memory_stats() self.stats.update(stats) # 🎯 FIX: Collect memories with deduplication by key memory_dict = {} # Use dict to deduplicate by key # Get stored memories if hasattr(self.memory_logger, "get_recent_stored_memories"): stored_memories = self.memory_logger.get_recent_stored_memories(20) if stored_memories: for memory in stored_memories: key = self._get_key(memory) memory_dict[key] = memory # Only add search results if we didn't get stored memories above elif hasattr(self.memory_logger, "search_memories"): stored_memories = self.memory_logger.search_memories( query=" ", num_results=20, log_type="memory", ) if stored_memories: for memory in stored_memories: key = self._get_key(memory) memory_dict[key] = memory # Get orchestration logs (separate search, different log_type) if hasattr(self.memory_logger, "search_memories"): try: orchestration_logs = self.memory_logger.search_memories( query=" ", num_results=20, log_type="log", # 🎯 FIX: Use "log" instead of "orchestration" ) if orchestration_logs: for memory in orchestration_logs: key = self._get_key(memory) # Only add if not already present (avoid duplicates) if key not in memory_dict: memory_dict[key] = memory except Exception: # Some backends might not support this pass # Convert back to list self.memory_data = list(memory_dict.values()) # Get performance metrics if available if hasattr(self.memory_logger, "get_performance_metrics"): perf_metrics = self.memory_logger.get_performance_metrics() perf_metrics["timestamp"] = time.time() self.performance_history.append(perf_metrics) except Exception: # Log error but continue pass
[docs] def is_short_term_memory(self, memory): """Check if a memory entry is short-term (TTL < 1 hour).""" ttl = ( memory.get("ttl_seconds") or memory.get("ttl") or memory.get("expires_at") or memory.get("expiry") ) if ttl is None or ttl == "" or ttl == -1: return False try: # Handle string TTL values if isinstance(ttl, str): if ttl.lower() in ["none", "null", "infinite", "∞", ""]: return False ttl_val = int(float(ttl)) else: ttl_val = int(ttl) if ttl_val <= 0: return False return ttl_val < 3600 # Less than 1 hour except (ValueError, TypeError): return False
def _get_memory_type(self, memory): """Get the actual memory_type field from memory entry.""" # First check direct memory_type field memory_type = memory.get("memory_type") if memory_type: # 🎯 FIX: Handle bytes values from Redis if isinstance(memory_type, bytes): memory_type = memory_type.decode("utf-8", errors="ignore") if memory_type in ["short_term", "long_term"]: return memory_type # Check in metadata metadata = memory.get("metadata", {}) memory_type = metadata.get("memory_type") if memory_type: # 🎯 FIX: Handle bytes values from Redis if isinstance(memory_type, bytes): memory_type = memory_type.decode("utf-8", errors="ignore") if memory_type in ["short_term", "long_term"]: return memory_type # Default fallback return "unknown"
[docs] def get_filtered_memories(self, memory_type="all"): """Get memories filtered by type using actual memory_type field.""" if memory_type == "short": # 🎯 FIX: Use actual memory_type field instead of TTL return [ m for m in self.memory_data if self._get_log_type(m) == "memory" and self._get_memory_type(m) == "short_term" ] elif memory_type == "long": # 🎯 FIX: Use actual memory_type field instead of TTL return [ m for m in self.memory_data if self._get_log_type(m) == "memory" and self._get_memory_type(m) == "long_term" ] elif memory_type == "logs": # All log entries (not memory type) return [ m for m in self.memory_data if self._get_log_type(m) in ["log", "system"] # 🎯 FIX: Use "log" instead of "orchestration" ] else: return self.memory_data
def _get_log_type(self, memory): """Extract log type from memory entry.""" metadata = memory.get("metadata", {}) return ( self._safe_decode(metadata.get("log_type")) or self._safe_decode(memory.get("log_type")) or self._safe_decode(memory.get("type")) or "unknown" ) def _get_content(self, memory): """Extract and decode content from memory entry.""" content = memory.get("content") or memory.get("message") or memory.get("data") or "" return self._safe_decode(content) def _get_key(self, memory): """Extract and decode key from memory entry.""" key = memory.get("key") or memory.get("id") or memory.get("node_id") or "unknown" return self._safe_decode(key) def _get_node_id(self, memory): """Extract and decode node_id from memory entry.""" return self._get_safe_field(memory, "node_id", "node", "id", default="unknown") def _get_timestamp(self, memory): """Extract timestamp from memory entry.""" timestamp = memory.get("timestamp", 0) if isinstance(timestamp, bytes): try: return int(timestamp.decode()) except: return 0 return int(timestamp) if timestamp else 0 def _get_importance_score(self, memory): """Extract importance score from memory entry.""" score = memory.get("importance_score", 0) if isinstance(score, bytes): try: return float(score.decode()) except: return 0.0 return float(score) if score else 0.0 def _get_ttl_formatted(self, memory): """Extract formatted TTL from memory entry.""" return self._get_safe_field(memory, "ttl_formatted", "ttl", default="?")
[docs] def get_memory_distribution(self): """Get distribution of memory types and log types for diagnostic purposes.""" distribution = { "total_entries": len(self.memory_data), "by_log_type": {}, "by_memory_type": {}, "stored_memories": { "total": 0, "short_term": 0, "long_term": 0, "unknown": 0, }, "log_entries": { "total": 0, "by_type": {}, }, } for memory in self.memory_data: log_type = self._get_log_type(memory) memory_type = self._get_memory_type(memory) # Count by log type distribution["by_log_type"][log_type] = distribution["by_log_type"].get(log_type, 0) + 1 # Count by memory type for stored memories if log_type == "memory": distribution["stored_memories"]["total"] += 1 distribution["stored_memories"][memory_type] += 1 else: distribution["log_entries"]["total"] += 1 distribution["log_entries"]["by_type"][log_type] = ( distribution["log_entries"]["by_type"].get(log_type, 0) + 1 ) # Overall memory type distribution distribution["by_memory_type"][memory_type] = ( distribution["by_memory_type"].get(memory_type, 0) + 1 ) return distribution
# 🎯 NEW: Unified Data Calculation System
[docs] def get_unified_stats(self): """ Get unified, comprehensive statistics for all TUI components. This replaces scattered calculations throughout the TUI system. """ # Calculate distribution once distribution = self.get_memory_distribution() # Get backend stats backend_stats = self.stats.current # 🎯 UNIFIED: Calculate all core metrics consistently unified_stats = { # === CORE COUNTS === "total_entries": distribution["total_entries"], "stored_memories": { "total": distribution["stored_memories"]["total"], "short_term": distribution["stored_memories"]["short_term"], "long_term": distribution["stored_memories"]["long_term"], "unknown": distribution["stored_memories"]["unknown"], }, "log_entries": { "total": distribution["log_entries"]["total"], "orchestration": distribution["by_log_type"].get("log", 0), "system": distribution["by_log_type"].get("system", 0), "by_type": distribution["log_entries"]["by_type"], }, # === BACKEND METRICS === "backend": { "type": self.backend, "connected": self.memory_logger is not None, "active_entries": backend_stats.get("active_entries", 0), "expired_entries": backend_stats.get("expired_entries", 0), "total_streams": backend_stats.get("total_streams", 0), "decay_enabled": backend_stats.get("decay_enabled", False), }, # === PERFORMANCE METRICS === "performance": { "has_data": len(self.performance_history) > 0, "latest": self.performance_history[-1] if self.performance_history else {}, "search_time": self.performance_history[-1].get("average_search_time", 0) if self.performance_history else 0, }, # === HEALTH INDICATORS === "health": { "overall": self._calculate_overall_health(), "memory": self._calculate_memory_health(), "backend": self._calculate_backend_health(), "performance": self._calculate_performance_health(), }, # === TRENDS (based on historical data) === "trends": { "total_entries": self.stats.get_trend("total_entries"), "stored_memories": self.stats.get_trend("stored_memories"), "orchestration_logs": self.stats.get_trend("orchestration_logs"), "active_entries": self.stats.get_trend("active_entries"), }, # === RATES (items per second) === "rates": { "total_entries": self.stats.get_rate("total_entries"), "stored_memories": self.stats.get_rate("stored_memories"), "orchestration_logs": self.stats.get_rate("orchestration_logs"), }, # === RAW DISTRIBUTION (for debugging) === "raw_distribution": distribution, } return unified_stats
def _calculate_overall_health(self): """Calculate overall system health status.""" if not self.memory_logger: return {"status": "critical", "icon": "🔴", "message": "No Connection"} stats = self.stats.current total = stats.get("total_entries", 0) expired = stats.get("expired_entries", 0) if total == 0: return {"status": "warning", "icon": "🟡", "message": "No Data"} expired_ratio = expired / total if total > 0 else 0 if expired_ratio < 0.1: return {"status": "healthy", "icon": "🟢", "message": "Healthy"} elif expired_ratio < 0.3: return {"status": "degraded", "icon": "🟡", "message": "Degraded"} else: return {"status": "critical", "icon": "🔴", "message": "Critical"} def _calculate_memory_health(self): """Calculate memory system health.""" stats = self.stats.current total = stats.get("total_entries", 0) active = stats.get("active_entries", 0) expired = stats.get("expired_entries", 0) if total == 0: return {"status": "warning", "icon": "🟡", "message": "No Data"} expired_ratio = expired / total if total > 0 else 0 active_ratio = active / total if total > 0 else 0 if expired_ratio < 0.1 and active_ratio > 0.8: return {"status": "healthy", "icon": "🟢", "message": "Healthy"} elif expired_ratio < 0.3: return {"status": "degraded", "icon": "🟡", "message": "Degraded"} else: return {"status": "critical", "icon": "🔴", "message": "Critical"} def _calculate_backend_health(self): """Calculate backend connection health.""" if not self.memory_logger: return {"status": "critical", "icon": "🔴", "message": "Disconnected"} try: # Test basic connectivity - check for different client attribute names if hasattr(self.memory_logger, "redis_client"): # RedisStack and Redis loggers use redis_client try: # Test actual connectivity with ping ping_result = self.memory_logger.redis_client.ping() if ping_result: return {"status": "healthy", "icon": "🟢", "message": "Connected"} else: return {"status": "warning", "icon": "🟡", "message": "Limited"} except: return {"status": "warning", "icon": "🟡", "message": "Limited"} elif hasattr(self.memory_logger, "client"): # Other memory loggers might use client return {"status": "healthy", "icon": "🟢", "message": "Connected"} else: # Memory logger exists but no known client attribute return {"status": "warning", "icon": "🟡", "message": "Limited"} except: return {"status": "critical", "icon": "🔴", "message": "Error"} def _calculate_performance_health(self): """Calculate performance health.""" if not self.performance_history: return {"status": "unknown", "icon": "❓", "message": "No Data"} latest = self.performance_history[-1] search_time = latest.get("average_search_time", 0) if search_time < 0.1: return {"status": "excellent", "icon": "⚡", "message": "Fast"} elif search_time < 0.5: return {"status": "good", "icon": "✅", "message": "Good"} elif search_time < 1.0: return {"status": "moderate", "icon": "⚠️", "message": "Moderate"} else: return {"status": "slow", "icon": "🐌", "message": "Slow"} # 🎯 UNIFIED: Centralized data extraction methods (handle bytes consistently) def _safe_decode(self, value): """Safely decode bytes to string, handle all data types.""" if isinstance(value, bytes): return value.decode("utf-8", errors="ignore") return str(value) if value is not None else "" def _get_safe_field(self, memory, *field_names, default="unknown"): """Safely get a field from memory, trying multiple field names and handling bytes.""" for field_name in field_names: value = memory.get(field_name) if value is not None: return self._safe_decode(value) return default
[docs] def debug_memory_data(self): """Debug method to inspect memory data structure.""" for i, memory in enumerate(self.memory_data[:3]): print( f" {i + 1}. log_type={self._get_log_type(memory)}, memory_type={self._get_memory_type(memory)}, key={self._get_key(memory)[:20]}...", )