Source code for orka.memory_logger

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-reasoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-reasoning

"""
Memory Logger
=============

The Memory Logger is a critical component of the OrKa framework that provides
persistent storage and retrieval capabilities for orchestration events, agent outputs,
and system state. It serves as both a runtime memory system and an audit trail for
agent workflows.

**Modular Architecture**
    The memory logger features a modular architecture with focused components
    while maintaining 100% backward compatibility through factory functions.

Key Features
------------

**Event Logging**
    Records all agent activities and system events with detailed metadata

**Data Persistence**
    Stores data in Redis streams with reliability and durability

**Serialization**
    Handles conversion of complex Python objects to JSON-serializable formats
    with intelligent blob deduplication

**Error Resilience**
    Implements fallback mechanisms for handling serialization errors gracefully

**Querying**
    Provides methods to retrieve recent events and specific data points efficiently

**File Export**
    Supports exporting memory logs to files for analysis and backup

**RedisStack Backend**
    High-performance RedisStack backend with HNSW vector indexing for semantic search

Core Use Cases
==============

The Memory Logger is essential for:

* Enabling agents to access past context and outputs
* Debugging and auditing agent workflows
* Maintaining state across distributed components
* Supporting complex workflow patterns like fork/join
* Providing audit trails for compliance and analysis

Modular Components
------------------

The memory system is composed of specialized modules:

:class:`~orka.memory.base_logger.BaseMemoryLogger`
    Abstract base class defining the memory logger interface

:class:`~orka.memory.redis_logger.RedisMemoryLogger`
    Complete Redis backend implementation with streams and data structures

:class:`~orka.memory.redisstack_logger.RedisStackMemoryLogger`
    High-performance RedisStack backend with HNSW vector indexing

:class:`~orka.memory.serialization`
    JSON sanitization and memory processing utilities

:class:`~orka.memory.file_operations`
    Save/load functionality and file I/O operations

:class:`~orka.memory.compressor`
    Data compression utilities for efficient storage

Usage Examples
==============

**Factory Function (Recommended)**

.. code-block:: python

    from orka.memory_logger import create_memory_logger

    # RedisStack backend (default - recommended)
    redisstack_memory = create_memory_logger("redisstack", redis_url="redis://localhost:6380")

    # Basic Redis backend
    redis_memory = create_memory_logger("redis", redis_url="redis://localhost:6380")

**Direct Instantiation**

.. code-block:: python

    from orka.memory.redis_logger import RedisMemoryLogger
    from orka.memory.redisstack_logger import RedisStackMemoryLogger

    # Redis logger
    redis_logger = RedisMemoryLogger(redis_url="redis://localhost:6380")

    # RedisStack logger with HNSW
    redisstack_logger = RedisStackMemoryLogger(redis_url="redis://localhost:6380")

**Environment-Based Configuration**

.. code-block:: python

    import os
    from orka.memory_logger import create_memory_logger

    # Set backend via environment variable
    os.environ["ORKA_MEMORY_BACKEND"] = "redisstack"

    # Logger will use RedisStack automatically
    memory = create_memory_logger()

Backend Comparison
------------------

**RedisStack Backend (Recommended)**
    * **Best for**: Production AI workloads, high-performance applications
    * **Features**: HNSW vector indexing, 100x faster search, advanced memory management
    * **Performance**: Sub-millisecond search, 50,000+ operations/second

**Redis Backend (Legacy)**
    * **Best for**: Development, single-node deployments, quick prototyping
    * **Features**: Fast in-memory operations, simple setup, full feature support
    * **Limitations**: Basic search capabilities, no vector indexing

Implementation Notes
--------------------

**Backward Compatibility**
    All existing code using ``RedisMemoryLogger`` continues to work unchanged

**Performance Optimizations**
    * Blob deduplication reduces storage overhead
    * In-memory buffers provide fast access to recent events
    * Batch operations improve throughput
    * HNSW indexing for ultra-fast vector search

**Error Handling**
    * Robust sanitization handles non-serializable objects
    * Graceful degradation prevents workflow failures
    * Detailed error logging aids debugging

**Thread Safety**
    All memory logger implementations are thread-safe for concurrent access
"""

# Import all components from the new memory package
import logging
import os
from typing import Any

from .memory.base_logger import BaseMemoryLogger
from .memory.redis_logger import RedisMemoryLogger

logger = logging.getLogger(__name__)


[docs] def apply_memory_preset_to_config( config: dict[str, Any], memory_preset: str | None = None, operation: str | None = None ) -> dict[str, Any]: """ Apply memory preset operation-specific defaults to a configuration dictionary. This function intelligently merges memory preset defaults based on the operation type (read/write) with the provided configuration, allowing users to specify just a preset and have appropriate defaults applied automatically. Args: config: Base configuration dictionary memory_preset: Name of the memory preset (sensory, working, episodic, semantic, procedural, meta) operation: Memory operation type ('read' or 'write') Returns: Enhanced configuration with preset defaults applied Example: >>> config = {"operation": "read", "namespace": "test"} >>> enhanced = apply_memory_preset_to_config(config, "episodic", "read") >>> # Returns config with episodic read defaults like similarity_threshold, vector_weight, etc. """ if not memory_preset or not operation: return config try: from .memory.presets import get_operation_defaults # Get operation-specific defaults for the preset operation_defaults = get_operation_defaults(memory_preset, operation) # Apply defaults for any missing keys in config enhanced_config = config.copy() for key, default_value in operation_defaults.items(): if key not in enhanced_config: enhanced_config[key] = default_value logger.debug(f"Applied {memory_preset}_{operation} preset defaults to config") return enhanced_config except ImportError: logger.warning("Memory presets not available, using config as-is") return config except Exception as e: logger.error(f"Failed to apply preset '{memory_preset}' for operation '{operation}': {e}") return config
[docs] def create_memory_logger( backend: str = "redisstack", redis_url: str | None = None, stream_key: str = "orka:memory", debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None, operation: str | None = None, # NEW: Support for operation-aware presets enable_hnsw: bool = True, vector_params: dict[str, Any] | None = None, format_params: dict[str, Any] | None = None, index_name: str = "orka_enhanced_memory", vector_dim: int = 384, force_recreate_index: bool = False, **kwargs, ) -> BaseMemoryLogger: """ Enhanced factory with RedisStack as primary backend. Creates a memory logger instance based on the specified backend. Defaults to RedisStack for optimal performance with automatic fallback. Args: backend: Memory backend type ("redisstack", "redis") redis_url: Redis connection URL stream_key: Redis stream key for logging debug_keep_previous_outputs: Whether to keep previous outputs in logs decay_config: Memory decay configuration memory_preset: Memory preset name (sensory, working, episodic, semantic, procedural, meta) enable_hnsw: Enable HNSW vector indexing (RedisStack only) vector_params: HNSW configuration parameters format_params: Content formatting parameters (e.g., newline handling, custom filters) index_name: Name of the RedisStack index for vector search vector_dim: Dimension of vector embeddings force_recreate_index: Whether to force recreate index if it exists but is misconfigured **kwargs: Additional parameters for backward compatibility Returns: Configured memory logger instance Raises: ImportError: If required dependencies are not available ConnectionError: If backend connection fails Notes: All parameters can be configured through YAML configuration. Vector parameters can be specified in detail through the vector_params dictionary. """ # Normalize backend name backend = backend.lower() # Set default decay configuration if not provided if decay_config is None: decay_config = { "enabled": True, "default_short_term_hours": 1.0, "default_long_term_hours": 24.0, "check_interval_minutes": 30, } # Handle force basic Redis flag force_basic_redis = os.getenv("ORKA_FORCE_BASIC_REDIS", "false").lower() == "true" if force_basic_redis and backend in ["redis", "redisstack"]: # Force basic Redis when explicitly requested logging.getLogger(__name__).info("🔧 Force basic Redis mode enabled") try: from .memory.redis_logger import RedisMemoryLogger return RedisMemoryLogger( redis_url=redis_url, stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, memory_preset=memory_preset, ) except ImportError as e: raise ImportError(f"Basic Redis backend not available: {e}") from e # PRIORITY: Try RedisStack first for redis/redisstack backends if backend in ["redisstack", "redis"]: try: from .memory.redisstack_logger import RedisStackMemoryLogger # Initialize embedder for vector search embedder = None try: from .utils.embedder import get_embedder embedder = get_embedder() logger.info("✅ Embedder initialized for vector search") except Exception as e: logger.warning(f"⚠️ Could not initialize embedder: {e}") logger.warning("Vector search will not be available") # Prepare vector params with additional configuration effective_vector_params = vector_params or {} # Add force_recreate to vector params if specified if force_recreate_index: effective_vector_params["force_recreate"] = True logger.info("✅ RedisStack with HNSW and vector search enabled") return RedisStackMemoryLogger( redis_url=redis_url or "redis://localhost:6380/0", index_name=index_name, embedder=embedder, memory_decay_config=decay_config, stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, memory_preset=memory_preset, enable_hnsw=enable_hnsw, vector_params=effective_vector_params, format_params=format_params, vector_dim=vector_dim, force_recreate_index=force_recreate_index, **kwargs, ) except ImportError as e: # Fall back to basic Redis if RedisStack is not available logger.warning(f"RedisStack not available, falling back to basic Redis: {e}") try: from .memory.redis_logger import RedisMemoryLogger return RedisMemoryLogger( redis_url=redis_url, stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, memory_preset=memory_preset, ) except ImportError as e: raise ImportError(f"No Redis backends available: {e}") from e raise ValueError(f"Unsupported backend: {backend}. Supported: redisstack, redis")
# Add MemoryLogger alias for backward compatibility with tests MemoryLogger = RedisMemoryLogger