Source code for orka.cli.memory.commands
# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-reasoning
"""
Memory CLI Commands
==================
This module contains CLI commands for memory management operations including
statistics, cleanup, and configuration.
"""
import json
import logging
import os
import sys
from typing import Any
from orka.memory_logger import create_memory_logger
logger = logging.getLogger(__name__)
[docs]
def memory_stats(args: Any) -> int:
"""Display memory usage statistics."""
try:
# Get backend from args or environment, default to redisstack for best performance
backend = getattr(args, "backend", None) or os.getenv("ORKA_MEMORY_BACKEND", "redisstack")
# Provide proper Redis URL based on backend
if backend == "redisstack":
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
else:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
# Try RedisStack first for enhanced performance, fallback to Redis if needed
try:
memory = create_memory_logger(backend=str(backend), redis_url=redis_url)
except ImportError as e:
if backend == "redisstack":
logger.info(f"RedisStack not available ({e}), falling back to Redis")
backend = "redis"
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
memory = create_memory_logger(backend=backend, redis_url=redis_url)
else:
raise
# Get statistics
stats = memory.get_memory_stats()
# Display results
if args.json:
output = {"stats": stats}
logger.info(json.dumps(output, indent=2))
else:
logger.info("=== OrKa Memory Statistics ===")
logger.info(f"Backend: {stats.get('backend', backend)}")
logger.info(f"Decay Enabled: {stats.get('decay_enabled', False)}")
logger.info(f"Total Streams: {stats.get('total_streams', 0)}")
logger.info(f"Total Entries: {stats.get('total_entries', 0)}")
logger.info(f"Expired Entries: {stats.get('expired_entries', 0)}")
if stats.get("entries_by_type"):
logger.info("\nEntries by Type:")
for event_type, count in stats["entries_by_type"].items():
logger.info(f" {event_type}: {count}")
if stats.get("entries_by_memory_type"):
logger.info("\nEntries by Memory Type:")
for memory_type, count in stats["entries_by_memory_type"].items():
logger.info(f" {memory_type}: {count}")
if stats.get("entries_by_category"):
logger.info("\nEntries by Category:")
for category, count in stats["entries_by_category"].items():
if count > 0: # Only show categories with entries
logger.info(f" {category}: {count}")
if stats.get("decay_config"):
logger.info("\nDecay Configuration:")
config = stats["decay_config"]
logger.info(f" > Short-term retention: {config.get('short_term_hours')}h")
logger.info(f" > Long-term retention: {config.get('long_term_hours')}h")
logger.info(f" > Check interval: {config.get('check_interval_minutes')}min")
if config.get("last_decay_check"):
logger.info(f" > Last cleanup: {config['last_decay_check']}")
except Exception as e:
logger.error(f"Error getting memory statistics: {e}")
return 1
return 0
[docs]
def memory_cleanup(args: Any) -> int:
"""Clean up expired memory entries."""
try:
# Get backend from args or environment, default to redisstack for best performance
backend = getattr(args, "backend", None) or os.getenv("ORKA_MEMORY_BACKEND", "redisstack")
# Provide proper Redis URL based on backend
if backend == "redisstack":
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
else:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
# Try RedisStack first for enhanced performance, fallback to Redis if needed
try:
memory = create_memory_logger(backend=str(backend), redis_url=redis_url)
except ImportError as e:
if backend == "redisstack":
logger.info(f"⚠️ RedisStack not available ({e}), falling back to Redis")
backend = "redis"
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
memory = create_memory_logger(backend=backend, redis_url=redis_url)
else:
raise
# Perform cleanup
if args.dry_run:
logger.info("=== Dry Run: Memory Cleanup Preview ===")
else:
logger.info("=== Memory Cleanup ===")
result = memory.cleanup_expired_memories(dry_run=args.dry_run)
# Display results
if args.json:
output = {"cleanup_result": result}
logger.info(json.dumps(output, indent=2))
else:
logger.info(f"Backend: {backend}")
logger.info(f"Status: {result.get('status', 'completed')}")
logger.info(f"Deleted Entries: {result.get('deleted_count', 0)}")
logger.info(f"Streams Processed: {result.get('streams_processed', 0)}")
logger.info(f"Total Entries Checked: {result.get('total_entries_checked', 0)}")
if result.get("error_count", 0) > 0:
logger.info(f"Errors: {result['error_count']}")
if result.get("duration_seconds"):
logger.info(f"Duration: {result['duration_seconds']:.2f}s")
if args.verbose and result.get("deleted_entries"):
logger.info("\nDeleted Entries:")
for entry in result["deleted_entries"][:10]: # Show first 10
entry_desc = (
f"{entry.get('agent_id', 'unknown')} - {entry.get('event_type', 'unknown')}"
)
if "stream" in entry:
logger.info(f" {entry['stream']}: {entry_desc}")
else:
logger.info(f" {entry_desc}")
if len(result["deleted_entries"]) > 10:
logger.info(f" ... and {len(result['deleted_entries']) - 10} more")
except Exception as e:
logger.error(f"Error during memory cleanup: {e}")
return 1
return 0
[docs]
def memory_configure(args: Any) -> int:
"""Enhanced memory configuration with RedisStack testing."""
try:
backend = args.backend or os.getenv("ORKA_MEMORY_BACKEND", "redisstack")
# Provide proper Redis URL based on backend
if backend == "redisstack":
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
else:
redis_url = os.getenv("REDIS_URL", "redis://localhost:6380/0")
logger.info("=== OrKa Memory Configuration Test ===")
logger.info(f"Backend: {backend}")
# Test configuration
logger.info("\n🧪 Testing Configuration:")
try:
memory = create_memory_logger(backend=backend, redis_url=redis_url)
# Basic decay config test
if hasattr(memory, "decay_config"):
config = memory.decay_config
logger.info(
f"✅ Decay Config: {'Enabled' if config.get('enabled', False) else 'Disabled'}",
)
if config.get("enabled", False):
logger.info(f" Short-term: {config.get('default_short_term_hours', 1.0)}h")
logger.info(f" Long-term: {config.get('default_long_term_hours', 24.0)}h")
logger.info(f" Check interval: {config.get('check_interval_minutes', 30)}min")
else:
logger.info("⚠️ Decay Config: Not available")
# Backend-specific tests
if backend == "redisstack":
logger.info("\n🔍 RedisStack-Specific Tests:")
# Test index availability
try:
if hasattr(memory, "client"):
memory.client.ft("enhanced_memory_idx").info()
logger.info("✅ HNSW Index: Available")
# Get index details
index_info = memory.client.ft("enhanced_memory_idx").info()
logger.info(f" Documents: {index_info.get('num_docs', 0)}")
logger.info(
f" Indexing: {'Yes' if index_info.get('indexing', False) else 'No'}",
)
else:
logger.info("⚠️ HNSW Index: Cannot test (no client access)")
except Exception as e:
logger.error(f"❌ HNSW Index: Not available - {e}")
elif backend == "redis":
logger.info("\n🔧 Redis-Specific Tests:")
# Test basic connectivity
try:
if hasattr(memory, "client"):
memory.client.ping()
logger.info("✅ Redis Connection: Active")
else:
logger.info("⚠️ Redis Connection: Cannot test")
except Exception as e:
logger.error(f"❌ Redis Connection: Error - {e}")
# Test decay cleanup
try:
cleanup_result = memory.cleanup_expired_memories(dry_run=True)
logger.info("✅ Decay Cleanup: Available")
logger.info(
f" Checked: {cleanup_result.get('total_entries_checked', 0)} entries"
)
except Exception as e:
logger.error(f"❌ Decay Cleanup: Error - {e}")
# Test memory stats retrieval
try:
stats = memory.get_memory_stats()
logger.info("\n✅ Memory Stats: Available")
logger.info(f" Total entries: {stats.get('total_entries', 0)}")
logger.info(f" Decay enabled: {stats.get('decay_enabled', False)}")
if stats.get("entries_by_memory_type"):
logger.info(
f" Memory types: {len(stats['entries_by_memory_type'])} categories"
)
except Exception as e:
logger.error(f"\n❌ Memory Stats: Error - {e}")
logger.info("\n✅ Configuration test completed")
except Exception as e:
logger.error(f"❌ Configuration test failed: {e}")
return 1
except Exception as e:
logger.error(f"❌ Error testing configuration: {e}")
return 1
return 0