Source code for orka.tui.textual_widgets
"""
Custom Textual widgets for OrKa memory monitoring.
"""
from datetime import datetime
from typing import Any, Dict, List, Optional
from textual.containers import Container
from textual.message import Message
from textual.widgets import DataTable, Static
[docs]
class StatsWidget(Static):
"""Widget for displaying memory statistics."""
def __init__(self, data_manager, **kwargs):
super().__init__(**kwargs)
self.data_manager = data_manager
[docs]
def update_stats(self):
"""Update the stats display."""
stats = self.data_manager.stats.current
# Format the statistics
content = self._format_stats(stats)
self.update(content)
def _format_stats(self, stats: Dict[str, Any]) -> str:
"""Format statistics for display using unified stats system."""
# π― USE UNIFIED: Get all stats from centralized calculation
unified = self.data_manager.get_unified_stats()
# Extract key metrics from unified stats
total_entries = unified["total_entries"]
stored_memories = unified["stored_memories"]
log_entries = unified["log_entries"]
backend = unified["backend"]
health = unified["health"]
trends = unified["trends"]
return f"""[bold]π Memory Statistics[/bold]
[metric-label]Total Entries:[/metric-label] [metric-value]{total_entries:,}[/metric-value] {trends["total_entries"]}
[metric-label]Short-term Memory:[/metric-label] [metric-value]{stored_memories["short_term"]:,}[/metric-value]
[metric-label]Long-term Memory:[/metric-label] [metric-value]{stored_memories["long_term"]:,}[/metric-value] {trends["stored_memories"]}
[metric-label]Orchestration Logs:[/metric-label] [metric-value]{log_entries["orchestration"]:,}[/metric-value]
[metric-label]Active Entries:[/metric-label] [metric-value]{backend["active_entries"]:,}[/metric-value]
[metric-label]Expired Entries:[/metric-label] [metric-value]{backend["expired_entries"]:,}[/metric-value]
[metric-label]Backend:[/metric-label] [status-info]{backend["type"]}[/status-info]
[metric-label]Status:[/metric-label] [status-good]Connected[/status-good]"""
[docs]
class MemoryTableWidget(DataTable):
"""Custom data table for displaying memory entries with checkbox selection."""
[docs]
class MemorySelected(Message):
"""Message sent when a memory row is selected."""
def __init__(self, memory_data: Dict[str, Any], row_index: int) -> None:
self.memory_data = memory_data
self.row_index = row_index
super().__init__()
def __init__(self, data_manager, memory_type="all", **kwargs):
super().__init__(**kwargs)
self.data_manager = data_manager
self.memory_type = memory_type
self.current_memories = [] # Store current memory data
self.selected_memory_key: Optional[str] = None # Track selected memory key across refreshes
# Enable row selection
self.cursor_type = "row"
self.zebra_stripes = True
# π― IMPROVED: Add checkbox column + reorganized columns
self.add_columns(
"β", # Checkbox for selection
"Time", # When was it created
"TTL", # How long until expiry
"Memory Key", # Full access to memory key
"Type", # Memory type (short/long term)
"Content", # Content preview
"Score", # Importance score
)
[docs]
def on_data_table_cell_selected(self, event: DataTable.CellSelected) -> None:
"""Handle cell selection to toggle checkbox - alternative approach."""
try:
row_index = event.coordinate.row
if 0 <= row_index < len(self.current_memories):
selected_memory = self.current_memories[row_index]
memory_key = self.data_manager._get_key(selected_memory)
# Toggle selection
if self.selected_memory_key == memory_key:
self.selected_memory_key = None
selected_memory = None
else:
self.selected_memory_key = memory_key
# Refresh table to update checkboxes
self.update_data(self.memory_type)
# Send message to parent screen
self.post_message(self.MemorySelected(selected_memory, row_index))
except Exception as e:
if hasattr(self, "app"):
self.app.notify(f"Selection error: {e!s}", severity="error")
[docs]
def on_data_table_row_selected(self, event: DataTable.RowSelected) -> None:
"""Handle row selection to toggle checkbox."""
try:
# Try to use the cursor_row directly
row_index = self.cursor_row
if row_index is not None and 0 <= row_index < len(self.current_memories):
selected_memory = self.current_memories[row_index]
memory_key = self.data_manager._get_key(selected_memory)
# Toggle selection
if self.selected_memory_key == memory_key:
self.selected_memory_key = None
selected_memory = None
row_index = -1
else:
self.selected_memory_key = memory_key
# Refresh table to update checkboxes
self.update_data(self.memory_type)
# Send message to parent screen
self.post_message(self.MemorySelected(selected_memory, row_index))
except Exception as e:
if hasattr(self, "app"):
self.app.notify(f"Selection error: {e!s}", severity="error")
[docs]
def update_data(self, memory_type="all"):
"""Update the table with filtered data while preserving selection."""
self.clear()
# Get filtered memories
memories = self.data_manager.get_filtered_memories(memory_type)
self.current_memories = memories[:25] # Store the memories we're actually displaying
# Handle empty states
if not memories and memory_type in ["short", "long"]:
unified = self.data_manager.get_unified_stats()
stored_memories = unified["stored_memories"]
if stored_memories["total"] == 0:
self.add_row(
"β",
"[dim]--[/dim]",
"[dim]--[/dim]",
"[dim]No stored memories found[/dim]",
"[dim]--[/dim]",
"[dim]Create memories using memory-writer nodes[/dim]",
"[dim]--[/dim]",
)
else:
short_count = stored_memories["short_term"]
long_count = stored_memories["long_term"]
if memory_type == "short" and short_count == 0:
self.add_row(
"β",
"[dim]--[/dim]",
"[dim]--[/dim]",
"[dim]No short-term memories[/dim]",
"[dim]--[/dim]",
f"[dim]Found {long_count} long-term instead[/dim]",
"[dim]--[/dim]",
)
elif memory_type == "long" and long_count == 0:
self.add_row(
"β",
"[dim]--[/dim]",
"[dim]--[/dim]",
"[dim]No long-term memories[/dim]",
"[dim]--[/dim]",
f"[dim]Found {short_count} short-term instead[/dim]",
"[dim]--[/dim]",
)
self.current_memories = []
return
# Populate table with memory data
selected_row_found = False
for i, memory in enumerate(self.current_memories):
# Extract memory details
content = self.data_manager._get_content(memory)
node_id = self.data_manager._get_node_id(memory)
importance_score = self.data_manager._get_importance_score(memory)
ttl_formatted = self.data_manager._get_ttl_formatted(memory)
timestamp = self.data_manager._get_timestamp(memory)
memory_key = self.data_manager._get_key(memory)
memory_type_actual = self.data_manager._get_memory_type(memory)
# Format columns
time_display = self._format_enhanced_timestamp(timestamp)
ttl_display = self._format_enhanced_ttl(ttl_formatted)
key_display = self._format_memory_key(memory_key)
type_display = self._format_memory_type(memory_type_actual, node_id)
content_display = self._format_content_preview(content)
# Checkbox - check if this memory is selected
if self.selected_memory_key == memory_key:
checkbox = "β" # Simple checkmark without markup
selected_row_found = True
else:
checkbox = "β"
self.add_row(
checkbox,
time_display,
ttl_display,
key_display,
type_display,
content_display,
f"[cyan]{importance_score:.1f}[/cyan]",
)
# If selected memory was not found (expired), clear selection
if not selected_row_found:
self.selected_memory_key = None
def _format_enhanced_timestamp(self, timestamp) -> str:
"""Enhanced timestamp formatting with relative time info."""
try:
if timestamp > 1000000000000: # milliseconds
dt = datetime.fromtimestamp(timestamp / 1000)
else: # seconds
dt = datetime.fromtimestamp(timestamp)
# Current time for relative calculations
now = datetime.now()
diff = now - dt
# Format based on age
if diff.total_seconds() < 60: # Less than 1 minute
return f"[green]{dt.strftime('%H:%M:%S')}[/green]"
elif diff.total_seconds() < 3600: # Less than 1 hour
mins = int(diff.total_seconds() / 60)
return f"[yellow]{dt.strftime('%H:%M')}[/yellow] [dim](-{mins}m)[/dim]"
elif diff.total_seconds() < 86400: # Less than 1 day
hours = int(diff.total_seconds() / 3600)
return f"[orange]{dt.strftime('%H:%M')}[/orange] [dim](-{hours}h)[/dim]"
else: # More than 1 day
days = int(diff.total_seconds() / 86400)
return f"[red]{dt.strftime('%m/%d')}[/red] [dim](-{days}d)[/dim]"
except:
return "[dim]Unknown[/dim]"
def _format_enhanced_ttl(self, ttl_formatted) -> str:
"""Enhanced TTL formatting with urgency indicators."""
if ttl_formatted == "Never" or ttl_formatted == "β" or not ttl_formatted:
return "[blue]βΎοΈ Never[/blue]"
# Parse TTL for urgency classification
ttl_str = str(ttl_formatted).lower()
if "s" in ttl_str and "m" not in ttl_str and "h" not in ttl_str:
# Seconds only - critical urgency
return f"[red]π¨ {ttl_formatted}[/red]"
elif "m" in ttl_str and "h" not in ttl_str:
# Minutes only - high urgency
return f"[yellow]β οΈ {ttl_formatted}[/yellow]"
elif "h" in ttl_str:
# Hours - medium urgency
if ttl_str.startswith("1h") or ttl_str.startswith("2h"):
return f"[orange]β° {ttl_formatted}[/orange]"
else:
return f"[green]π {ttl_formatted}[/green]"
else:
return f"[cyan]{ttl_formatted}[/cyan]"
def _format_memory_key(self, memory_key) -> str:
"""Format memory key with intelligent truncation."""
if not memory_key:
return "[dim]<no-key>[/dim]"
key_str = str(memory_key)
# Show meaningful parts of the key
if len(key_str) <= 25:
return f"[bright_blue]{key_str}[/bright_blue]"
else:
# Smart truncation: show start and end
start = key_str[:12]
end = key_str[-10:]
return (
f"[bright_blue]{start}[/bright_blue][dim]...[/dim][bright_blue]{end}[/bright_blue]"
)
def _format_memory_type(self, memory_type, node_id) -> str:
"""Format memory type with icons and node info."""
# Decode if bytes
if isinstance(memory_type, bytes):
memory_type = memory_type.decode("utf-8", errors="ignore")
if isinstance(node_id, bytes):
node_id = node_id.decode("utf-8", errors="ignore")
# Format based on type
if memory_type == "short_term":
icon = "β‘"
color = "yellow"
elif memory_type == "long_term":
icon = "π§ "
color = "green"
else:
icon = "π"
color = "white"
memory_type = memory_type or "unknown"
# Include node info for context
node_short = str(node_id)[:8] if node_id else "?"
return f"[{color}]{icon} {memory_type}[/{color}] [dim]({node_short})[/dim]"
def _format_content_preview(self, content) -> str:
"""Smart content preview with better truncation."""
if not content:
return "[dim]<empty>[/dim]"
# Decode if bytes
if isinstance(content, bytes):
content = content.decode("utf-8", errors="replace")
content_str = str(content).strip()
# Remove common JSON/structured prefixes for cleaner display
if content_str.startswith('{"') and content_str.endswith('"}'):
# Try to extract meaningful text from JSON
try:
import json
data = json.loads(content_str)
if isinstance(data, dict):
# Look for meaningful fields
for key in ["content", "text", "message", "description", "prompt"]:
if data.get(key):
content_str = str(data[key])
break
except:
pass
# Intelligent truncation
if len(content_str) <= 35:
return f"[white]{content_str}[/white]"
else:
# Find a good break point (space, comma, period)
truncated = content_str[:32]
for i in range(len(truncated) - 1, max(20, len(truncated) - 10), -1):
if truncated[i] in [" ", ",", ".", ";"]:
truncated = truncated[:i]
break
return f"[white]{truncated}[/white][dim]...[/dim]"
def _get_filtered_memories(self) -> List[Dict]:
"""Get memories filtered by type."""
return self.data_manager.get_filtered_memories(self.memory_type)
def _is_short_term(self, memory: Dict) -> bool:
"""Check if memory is short-term based on TTL."""
ttl = (
memory.get("ttl_seconds")
or memory.get("ttl")
or memory.get("expires_at")
or memory.get("expiry")
)
if ttl is None or ttl == "" or ttl == -1:
return False
try:
# Handle string TTL values
if isinstance(ttl, str):
if ttl.lower() in ["none", "null", "infinite", "β", ""]:
return False
ttl_val = int(float(ttl))
else:
ttl_val = int(ttl)
if ttl_val <= 0:
return False
return ttl_val < 3600 # Less than 1 hour
except (ValueError, TypeError):
return False
def _format_timestamp(self, timestamp) -> str:
"""Format timestamp for display."""
if not timestamp:
return "N/A"
try:
if isinstance(timestamp, (int, float)):
dt = datetime.fromtimestamp(timestamp)
else:
dt = datetime.fromisoformat(str(timestamp))
return dt.strftime("%H:%M:%S")
except:
return str(timestamp)[:8]
def _format_type(self, log_type: str) -> str:
"""Format log type with color coding."""
type_colors = {
"memory": "[memory-short]MEM[/memory-short]",
"orchestration": "[memory-long]ORC[/memory-long]",
"system": "[status-info]SYS[/status-info]",
}
return type_colors.get(log_type, log_type.upper()[:3])
def _format_ttl(self, ttl) -> str:
"""Format TTL for display."""
if ttl is None or ttl == "" or ttl == -1:
return "β"
try:
# Handle string TTL values
if isinstance(ttl, str):
if ttl.lower() in ["none", "null", "infinite", "β", ""]:
return "β"
ttl_val = int(float(ttl))
else:
ttl_val = int(ttl)
if ttl_val <= 0:
return "β"
elif ttl_val < 60:
return f"{ttl_val}s"
elif ttl_val < 3600:
return f"{ttl_val // 60}m"
elif ttl_val < 86400:
return f"{ttl_val // 3600}h"
else:
return f"{ttl_val // 86400}d"
except (ValueError, TypeError):
return str(ttl) if ttl else "β"
def _format_size(self, size) -> str:
"""Format size for display."""
if not size:
return "0B"
try:
size_val = int(size)
if size_val < 1024:
return f"{size_val}B"
elif size_val < 1024 * 1024:
return f"{size_val // 1024}KB"
else:
return f"{size_val // (1024 * 1024)}MB"
except (ValueError, TypeError):
return str(size)
def _truncate(self, text: str, max_len: int) -> str:
"""Truncate text to maximum length."""
if not text:
return ""
text_str = str(text)
return text_str[: max_len - 3] + "..." if len(text_str) > max_len else text_str
[docs]
class HealthWidget(Container):
"""Widget for displaying system health metrics."""
def __init__(self, data_manager, **kwargs):
super().__init__(**kwargs)
self.data_manager = data_manager
[docs]
def compose(self):
"""Compose the health widget."""
yield Static("π₯ System Health", classes="container")
yield Static("", id="health-content")
[docs]
def update_health(self):
"""Update health display with unified health calculations."""
# π― USE UNIFIED: Get all health data from centralized calculation
unified = self.data_manager.get_unified_stats()
health = unified["health"]
backend = unified["backend"]
performance = unified["performance"]
# Overall system health
overall = health["overall"]
overall_text = f"{overall['icon']} {overall['message']}"
# Memory system health
memory = health["memory"]
memory_text = f"{memory['icon']} {memory['message']}"
# Backend health
backend_health = health["backend"]
backend_text = f"{backend_health['icon']} {backend_health['message']}"
# Performance health
perf_health = health["performance"]
perf_text = f"{perf_health['icon']} {perf_health['message']}"
# Usage statistics
total = backend["active_entries"] + backend["expired_entries"]
usage_pct = (backend["active_entries"] / total * 100) if total > 0 else 0
# Format content with unified data
health_content = f"""
[bold]π₯ System Health Monitor[/bold]
[metric-label]Overall Status:[/metric-label] {overall_text}
[metric-label]Memory Health:[/metric-label] {memory_text}
[metric-label]Backend Status:[/metric-label] {backend_text}
[metric-label]Performance:[/metric-label] {perf_text}
[metric-label]Memory Usage:[/metric-label] [metric-value]{usage_pct:.1f}%[/metric-value]
[metric-label]Response Time:[/metric-label] [metric-value]{performance["search_time"]:.3f}s[/metric-value]
[metric-label]Backend Type:[/metric-label] [status-info]{backend["type"]}[/status-info]
[metric-label]Decay Status:[/metric-label] {"β
Active" if backend["decay_enabled"] else "β Inactive"}
"""
self.update(health_content)
[docs]
class LogsWidget(DataTable):
"""Enhanced widget for displaying memory logs with orchestration priority."""
def __init__(self, data_manager, **kwargs):
super().__init__(**kwargs)
self.data_manager = data_manager
self.add_columns("Time", "Node", "Type", "Content", "Details")
[docs]
def update_data(self):
"""Update logs with unified filtering - show overview of recent orchestration and system logs."""
self.clear()
# π― USE UNIFIED: Get all log data from centralized calculation
unified = self.data_manager.get_unified_stats()
log_entries = unified["log_entries"]
# Get actual log memories using existing filtering
all_logs = self.data_manager.get_filtered_memories("logs")
# Separate orchestration logs from others using unified data
orchestration_logs = [
log for log in all_logs if self.data_manager._get_log_type(log) == "log"
]
system_logs = [
log for log in all_logs if self.data_manager._get_log_type(log) in ["system"]
]
# Sort logs by timestamp (most recent first)
orchestration_logs.sort(key=lambda x: self.data_manager._get_timestamp(x), reverse=True)
system_logs.sort(key=lambda x: self.data_manager._get_timestamp(x), reverse=True)
# Add summary header
summary_details = (
f"Orchestration: {log_entries['orchestration']} | System: {log_entries['system']}"
)
self.add_row(
"[bold]--:--:--[/bold]",
"[bold]SUMMARY[/bold]",
"[bold]OVERVIEW[/bold]",
f"[bold]π Total Logs: {log_entries['total']}[/bold]",
f"[bold]{summary_details}[/bold]",
)
# Add separator
self.add_row("", "", "", "", "")
# Add recent orchestration logs (most important)
if orchestration_logs:
self.add_row(
"[cyan]--:--:--[/cyan]",
"[cyan]ORCHESTRATION[/cyan]",
"[cyan]HEADER[/cyan]",
"[cyan]π Recent Orchestration Activity[/cyan]",
"[cyan]Last 8 entries[/cyan]",
)
for log in orchestration_logs[:8]: # Show 8 most recent
content = self.data_manager._get_content(log)
node_id = self.data_manager._get_node_id(log)
timestamp = self.data_manager._get_timestamp(log)
# Format timestamp
try:
if timestamp > 1000000000000: # milliseconds
dt = datetime.fromtimestamp(timestamp / 1000)
else: # seconds
dt = datetime.fromtimestamp(timestamp)
time_display = dt.strftime("%H:%M:%S")
except:
time_display = "Unknown"
# Format content for overview (shorter)
content_overview = content[:35] + "..." if len(content) > 35 else content
# Extract key details (node activity, trace info, etc.)
details = self._extract_log_details(log)
self.add_row(
time_display,
node_id[:12], # Limit node_id length
"[cyan]orchestration[/cyan]",
content_overview,
details[:20] + "..." if len(details) > 20 else details,
)
# Add recent system logs if any
if system_logs:
# Add separator
self.add_row("", "", "", "", "")
self.add_row(
"[yellow]--:--:--[/yellow]",
"[yellow]SYSTEM[/yellow]",
"[yellow]HEADER[/yellow]",
"[yellow]π§ Recent System Activity[/yellow]",
"[yellow]Last 3 entries[/yellow]",
)
for log in system_logs[:3]: # Show 3 most recent system logs
content = self.data_manager._get_content(log)
node_id = self.data_manager._get_node_id(log)
timestamp = self.data_manager._get_timestamp(log)
# Format timestamp
try:
if timestamp > 1000000000000: # milliseconds
dt = datetime.fromtimestamp(timestamp / 1000)
else: # seconds
dt = datetime.fromtimestamp(timestamp)
time_display = dt.strftime("%H:%M:%S")
except:
time_display = "Unknown"
content_overview = content[:35] + "..." if len(content) > 35 else content
details = self._extract_log_details(log)
self.add_row(
time_display,
node_id[:12],
"[yellow]system[/yellow]",
content_overview,
details[:20] + "..." if len(details) > 20 else details,
)
# If no logs found
if not orchestration_logs and not system_logs:
self.add_row(
"[dim]--:--:--[/dim]",
"[dim]NO DATA[/dim]",
"[dim]EMPTY[/dim]",
"[dim]No recent logs found[/dim]",
"[dim]Run workflows to generate logs[/dim]",
)
def _extract_log_details(self, log):
"""Extract key details from log entry for overview."""
# Try to get trace_id, importance, or other key details
trace_id = self.data_manager._get_safe_field(log, "trace_id", "trace", default="")
importance = self.data_manager._get_importance_score(log)
# Build details string
details_parts = []
if trace_id and trace_id != "unknown":
details_parts.append(f"trace:{trace_id[:8]}")
if importance > 0:
details_parts.append(f"imp:{importance:.1f}")
# Check for special fields in metadata
metadata = log.get("metadata", {})
if isinstance(metadata, dict):
category = self.data_manager._safe_decode(metadata.get("category", ""))
if category and category != "unknown":
details_parts.append(f"cat:{category[:5]}")
return " | ".join(details_parts) if details_parts else "standard"