Source code for orka.memory_logger

# OrKa: Orchestrator Kit Agents
# Copyright © 2025 Marco Somma
#
# This file is part of OrKa – https://github.com/marcosomma/orka-resoning
#
# Licensed under the Apache License, Version 2.0 (Apache 2.0).
# You may not use this file for commercial purposes without explicit permission.
#
# Full license: https://www.apache.org/licenses/LICENSE-2.0
# For commercial use, contact: marcosomma.work@gmail.com
#
# Required attribution: OrKa by Marco Somma – https://github.com/marcosomma/orka-resoning

"""
Memory Logger
=============

The Memory Logger is a critical component of the OrKa framework that provides
persistent storage and retrieval capabilities for orchestration events, agent outputs,
and system state. It serves as both a runtime memory system and an audit trail for
agent workflows.

**Modular Architecture**
    The memory logger features a modular architecture with focused components
    while maintaining 100% backward compatibility through factory functions.

Key Features
------------

**Event Logging**
    Records all agent activities and system events with detailed metadata

**Data Persistence**
    Stores data in Redis streams or Kafka topics for reliability and durability

**Serialization**
    Handles conversion of complex Python objects to JSON-serializable formats
    with intelligent blob deduplication

**Error Resilience**
    Implements fallback mechanisms for handling serialization errors gracefully

**Querying**
    Provides methods to retrieve recent events and specific data points efficiently

**File Export**
    Supports exporting memory logs to files for analysis and backup

**Multiple Backends**
    Supports both Redis and Kafka backends with seamless switching

Core Use Cases
--------------

The Memory Logger is essential for:

* Enabling agents to access past context and outputs
* Debugging and auditing agent workflows
* Maintaining state across distributed components
* Supporting complex workflow patterns like fork/join
* Providing audit trails for compliance and analysis

Modular Components
------------------

The memory system is composed of specialized modules:

:class:`~orka.memory.base_logger.BaseMemoryLogger`
    Abstract base class defining the memory logger interface

:class:`~orka.memory.redis_logger.RedisMemoryLogger`
    Complete Redis backend implementation with streams and data structures

:class:`~orka.memory.kafka_logger.KafkaMemoryLogger`
    Kafka-based event streaming implementation

:class:`~orka.memory.serialization`
    JSON sanitization and memory processing utilities

:class:`~orka.memory.file_operations`
    Save/load functionality and file I/O operations

:class:`~orka.memory.compressor`
    Data compression utilities for efficient storage

Usage Examples
--------------

**Factory Function (Recommended)**

.. code-block:: python

    from orka.memory_logger import create_memory_logger

    # Redis backend (default)
    redis_memory = create_memory_logger("redis", redis_url="redis://localhost:6379")

    # Kafka backend
    kafka_memory = create_memory_logger("kafka", bootstrap_servers="localhost:9092")

**Direct Instantiation**

.. code-block:: python

    from orka.memory.redis_logger import RedisMemoryLogger
    from orka.memory.kafka_logger import KafkaMemoryLogger

    # Redis logger
    redis_logger = RedisMemoryLogger(redis_url="redis://localhost:6379")

    # Kafka logger
    kafka_logger = KafkaMemoryLogger(bootstrap_servers="localhost:9092")

**Environment-Based Configuration**

.. code-block:: python

    import os
    from orka.memory_logger import create_memory_logger

    # Set backend via environment variable
    os.environ["ORKA_MEMORY_BACKEND"] = "kafka"

    # Logger will use Kafka automatically
    memory = create_memory_logger()

Backend Comparison
------------------

**Redis Backend**
    * **Best for**: Development, single-node deployments, quick prototyping
    * **Features**: Fast in-memory operations, simple setup, full feature support
    * **Limitations**: Single point of failure, memory-bound storage

**Kafka Backend**
    * **Best for**: Production, distributed systems, high-throughput scenarios
    * **Features**: Persistent event log, horizontal scaling, fault tolerance
    * **Limitations**: More complex setup, higher resource usage

Implementation Notes
--------------------

**Backward Compatibility**
    All existing code using ``RedisMemoryLogger`` continues to work unchanged

**Performance Optimizations**
    * Blob deduplication reduces storage overhead
    * In-memory buffers provide fast access to recent events
    * Batch operations improve throughput

**Error Handling**
    * Robust sanitization handles non-serializable objects
    * Graceful degradation prevents workflow failures
    * Detailed error logging aids debugging

**Thread Safety**
    All memory logger implementations are thread-safe for concurrent access
"""

# Import all components from the new memory package
import logging
import os
from typing import Any, Dict, Optional

from .memory.base_logger import BaseMemoryLogger
from .memory.redis_logger import RedisMemoryLogger

logger = logging.getLogger(__name__)


[docs] def create_memory_logger( backend: str = "redisstack", redis_url: Optional[str] = None, bootstrap_servers: Optional[str] = None, topic_prefix: str = "orka-memory", stream_key: str = "orka:memory", debug_keep_previous_outputs: bool = False, decay_config: Optional[Dict[str, Any]] = None, enable_hnsw: bool = True, vector_params: Optional[Dict[str, Any]] = None, **kwargs, ) -> BaseMemoryLogger: """ Enhanced factory with RedisStack as primary backend. Creates a memory logger instance based on the specified backend. Defaults to RedisStack for optimal performance with automatic fallback. Args: backend: Memory backend type ("redisstack", "redis", "kafka") redis_url: Redis connection URL bootstrap_servers: Kafka bootstrap servers (for Kafka backend) topic_prefix: Kafka topic prefix (for Kafka backend) stream_key: Redis stream key for logging debug_keep_previous_outputs: Whether to keep previous outputs in logs decay_config: Memory decay configuration enable_hnsw: Enable HNSW vector indexing (RedisStack only) vector_params: HNSW configuration parameters **kwargs: Additional parameters for backward compatibility Returns: Configured memory logger instance Raises: ImportError: If required dependencies are not available ConnectionError: If backend connection fails """ # Normalize backend name backend = backend.lower() # Set default decay configuration if not provided if decay_config is None: decay_config = { "enabled": True, "default_short_term_hours": 1.0, "default_long_term_hours": 24.0, "check_interval_minutes": 30, } # ✅ Handle force basic Redis flag force_basic_redis = os.getenv("ORKA_FORCE_BASIC_REDIS", "false").lower() == "true" if force_basic_redis and backend in ["redis", "redisstack"]: # Force basic Redis when explicitly requested logging.getLogger(__name__).info("🔧 Force basic Redis mode enabled") try: from .memory.redis_logger import RedisMemoryLogger return RedisMemoryLogger( redis_url=redis_url, stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, ) except ImportError as e: raise ImportError(f"Basic Redis backend not available: {e}") from e # PRIORITY: Try RedisStack first for redis/redisstack backends if backend in ["redisstack", "redis"]: try: from .memory.redisstack_logger import RedisStackMemoryLogger # 🎯 CRITICAL: Initialize embedder for vector search embedder = None try: from .utils.embedder import get_embedder embedder = get_embedder() logger.info("✅ Embedder initialized for vector search") except Exception as e: logger.warning(f"⚠️ Could not initialize embedder: {e}") logger.warning("Vector search will not be available") logger_instance = RedisStackMemoryLogger( redis_url=redis_url, embedder=embedder, # 🎯 NEW: Pass embedder for vector search stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, enable_hnsw=enable_hnsw, vector_params=vector_params, ) # Test RedisStack capabilities try: index_ready = logger_instance.ensure_index() if index_ready: if embedder: logging.getLogger(__name__).info( "✅ RedisStack with HNSW and vector search enabled", ) else: logging.getLogger(__name__).info( "✅ RedisStack with HNSW enabled (no vector search)", ) return logger_instance else: logging.getLogger(__name__).warning( "⚠️ RedisStack index failed, falling back to basic Redis", ) except Exception as e: logging.getLogger(__name__).warning(f"RedisStack index test failed: {e}") except ImportError as e: logging.getLogger(__name__).warning(f"RedisStack not available: {e}") # Fallback to basic Redis only if RedisStack fails if backend == "redis" or (backend == "redisstack" and not force_basic_redis): try: from .memory.redis_logger import RedisMemoryLogger logging.getLogger(__name__).info("🔄 Using basic Redis backend") return RedisMemoryLogger( redis_url=redis_url, stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, ) except ImportError as e: if backend == "redisstack": raise ImportError(f"No Redis backends available: {e}") from e # Handle Kafka backend with RedisStack integration if backend == "kafka": try: from .memory.kafka_logger import KafkaMemoryLogger # ✅ CRITICAL: Use provided parameters or defaults kafka_bootstrap_servers = bootstrap_servers or os.getenv( "KAFKA_BOOTSTRAP_SERVERS", "localhost:9092", ) return KafkaMemoryLogger( bootstrap_servers=kafka_bootstrap_servers, redis_url=redis_url, stream_key=stream_key, debug_keep_previous_outputs=debug_keep_previous_outputs, decay_config=decay_config, enable_hnsw=enable_hnsw, vector_params=vector_params, ) except ImportError as e: logging.getLogger(__name__).warning( f"Kafka not available, falling back to RedisStack: {e}", ) # Recursive call with RedisStack return create_memory_logger( "redisstack", redis_url, stream_key, debug_keep_previous_outputs, decay_config, enable_hnsw, vector_params, ) raise ValueError(f"Unsupported backend: {backend}. Supported: redisstack, redis, kafka")
# Add MemoryLogger alias for backward compatibility with tests MemoryLogger = RedisMemoryLogger