Source code for orka.cli.memory.commands
# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning
"""
Memory CLI Commands
==================
This module contains CLI commands for memory management operations including
statistics, cleanup, and configuration.
"""
import json
import os
import sys
from orka.memory_logger import create_memory_logger
[docs]
def memory_stats(args):
"""Display memory usage statistics."""
try:
# Get backend from args or environment, default to redisstack for best performance
backend = getattr(args, "backend", None) or os.getenv("ORKA_MEMORY_BACKEND", "redisstack")
# Provide proper Redis URL based on backend
if backend == "redisstack":
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
else:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Try RedisStack first for enhanced performance, fallback to Redis if needed
try:
memory = create_memory_logger(backend=backend, redis_url=redis_url)
except ImportError as e:
if backend == "redisstack":
print(f"⚠️ RedisStack not available ({e}), falling back to Redis", file=sys.stderr)
backend = "redis"
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
memory = create_memory_logger(backend=backend, redis_url=redis_url)
else:
raise
# Get statistics
stats = memory.get_memory_stats()
# Display results
if args.json:
output = {"stats": stats}
print(json.dumps(output, indent=2))
else:
print("=== OrKa Memory Statistics ===")
print(f"Backend: {stats.get('backend', backend)}")
print(f"Decay Enabled: {stats.get('decay_enabled', False)}")
print(f"Total Streams: {stats.get('total_streams', 0)}")
print(f"Total Entries: {stats.get('total_entries', 0)}")
print(f"Expired Entries: {stats.get('expired_entries', 0)}")
if stats.get("entries_by_type"):
print("\nEntries by Type:")
for event_type, count in stats["entries_by_type"].items():
print(f" {event_type}: {count}")
if stats.get("entries_by_memory_type"):
print("\nEntries by Memory Type:")
for memory_type, count in stats["entries_by_memory_type"].items():
print(f" {memory_type}: {count}")
if stats.get("entries_by_category"):
print("\nEntries by Category:")
for category, count in stats["entries_by_category"].items():
if count > 0: # Only show categories with entries
print(f" {category}: {count}")
if stats.get("decay_config"):
print("\nDecay Configuration:")
config = stats["decay_config"]
print(f" Short-term retention: {config.get('short_term_hours')}h")
print(f" Long-term retention: {config.get('long_term_hours')}h")
print(f" Check interval: {config.get('check_interval_minutes')}min")
if config.get("last_decay_check"):
print(f" Last cleanup: {config['last_decay_check']}")
except Exception as e:
print(f"Error getting memory statistics: {e}", file=sys.stderr)
return 1
return 0
[docs]
def memory_cleanup(args):
"""Clean up expired memory entries."""
try:
# Get backend from args or environment, default to redisstack for best performance
backend = getattr(args, "backend", None) or os.getenv("ORKA_MEMORY_BACKEND", "redisstack")
# Provide proper Redis URL based on backend
if backend == "redisstack":
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
else:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
# Try RedisStack first for enhanced performance, fallback to Redis if needed
try:
memory = create_memory_logger(backend=backend, redis_url=redis_url)
except ImportError as e:
if backend == "redisstack":
print(f"⚠️ RedisStack not available ({e}), falling back to Redis", file=sys.stderr)
backend = "redis"
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
memory = create_memory_logger(backend=backend, redis_url=redis_url)
else:
raise
# Perform cleanup
if args.dry_run:
print("=== Dry Run: Memory Cleanup Preview ===")
else:
print("=== Memory Cleanup ===")
result = memory.cleanup_expired_memories(dry_run=args.dry_run)
# Display results
if args.json:
output = {"cleanup_result": result}
print(json.dumps(output, indent=2))
else:
print(f"Backend: {backend}")
print(f"Status: {result.get('status', 'completed')}")
print(f"Deleted Entries: {result.get('deleted_count', 0)}")
print(f"Streams Processed: {result.get('streams_processed', 0)}")
print(f"Total Entries Checked: {result.get('total_entries_checked', 0)}")
if result.get("error_count", 0) > 0:
print(f"Errors: {result['error_count']}")
if result.get("duration_seconds"):
print(f"Duration: {result['duration_seconds']:.2f}s")
if args.verbose and result.get("deleted_entries"):
print("\nDeleted Entries:")
for entry in result["deleted_entries"][:10]: # Show first 10
entry_desc = (
f"{entry.get('agent_id', 'unknown')} - {entry.get('event_type', 'unknown')}"
)
if "stream" in entry:
print(f" {entry['stream']}: {entry_desc}")
else:
print(f" {entry_desc}")
if len(result["deleted_entries"]) > 10:
print(f" ... and {len(result['deleted_entries']) - 10} more")
except Exception as e:
print(f"Error during memory cleanup: {e}", file=sys.stderr)
return 1
return 0
[docs]
def memory_configure(args):
"""Enhanced memory configuration with RedisStack testing."""
try:
backend = args.backend or os.getenv("ORKA_MEMORY_BACKEND", "redisstack")
# Provide proper Redis URL based on backend
if backend == "redisstack":
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
else:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
print("=== OrKa Memory Configuration Test ===")
print(f"Backend: {backend}")
# Test configuration
print("\n🧪 Testing Configuration:")
try:
memory = create_memory_logger(backend=backend, redis_url=redis_url)
# Basic decay config test
if hasattr(memory, "decay_config"):
config = memory.decay_config
print(
f"✅ Decay Config: {'Enabled' if config.get('enabled', False) else 'Disabled'}",
)
if config.get("enabled", False):
print(f" Short-term: {config.get('default_short_term_hours', 1.0)}h")
print(f" Long-term: {config.get('default_long_term_hours', 24.0)}h")
print(f" Check interval: {config.get('check_interval_minutes', 30)}min")
else:
print("⚠️ Decay Config: Not available")
# Backend-specific tests
if backend == "redisstack":
print("\n🔍 RedisStack-Specific Tests:")
# Test index availability
try:
if hasattr(memory, "client"):
memory.client.ft("enhanced_memory_idx").info()
print("✅ HNSW Index: Available")
# Get index details
index_info = memory.client.ft("enhanced_memory_idx").info()
print(f" Documents: {index_info.get('num_docs', 0)}")
print(
f" Indexing: {'Yes' if index_info.get('indexing', False) else 'No'}",
)
else:
print("⚠️ HNSW Index: Cannot test (no client access)")
except Exception as e:
print(f"❌ HNSW Index: Not available - {e}")
elif backend == "redis":
print("\n🔧 Redis-Specific Tests:")
# Test basic connectivity
try:
if hasattr(memory, "client"):
memory.client.ping()
print("✅ Redis Connection: Active")
else:
print("⚠️ Redis Connection: Cannot test")
except Exception as e:
print(f"❌ Redis Connection: Error - {e}")
# Test decay cleanup
try:
cleanup_result = memory.cleanup_expired_memories(dry_run=True)
print("✅ Decay Cleanup: Available")
print(f" Checked: {cleanup_result.get('total_entries_checked', 0)} entries")
except Exception as e:
print(f"❌ Decay Cleanup: Error - {e}")
elif backend == "kafka":
print("\n📡 Kafka-Specific Tests:")
# Test hybrid backend
try:
if hasattr(memory, "redis_url"):
print("✅ Hybrid Backend: Kafka + Redis")
print(f" Kafka topic: {getattr(memory, 'main_topic', 'N/A')}")
print(f" Redis URL: {memory.redis_url}")
else:
print("⚠️ Hybrid Backend: Configuration unclear")
except Exception as e:
print(f"❌ Hybrid Backend: Error - {e}")
# Test memory stats retrieval
try:
stats = memory.get_memory_stats()
print("\n✅ Memory Stats: Available")
print(f" Total entries: {stats.get('total_entries', 0)}")
print(f" Decay enabled: {stats.get('decay_enabled', False)}")
if stats.get("entries_by_memory_type"):
print(f" Memory types: {len(stats['entries_by_memory_type'])} categories")
except Exception as e:
print(f"\n❌ Memory Stats: Error - {e}")
print("\n✅ Configuration test completed")
except Exception as e:
print(f"❌ Configuration test failed: {e}")
return 1
except Exception as e:
print(f"❌ Error testing configuration: {e}", file=sys.stderr)
return 1
return 0