orka.memory package

Memory Package

The memory package provides persistent storage and retrieval capabilities for OrKa orchestration events, agent outputs, and system state. This package contains the modular architecture components for memory management with enhanced RedisStack support.

Package Overview

This package contains specialized components for different aspects of memory management:

Core Components

BaseMemoryLogger

Abstract base class defining the memory logger interface and common functionality

RedisMemoryLogger

Complete Redis backend implementation with Redis streams and data structures

RedisStackMemoryLogger

High-performance RedisStack backend with HNSW vector indexing for semantic search

Utility Mixins

SerializationMixin

JSON sanitization and memory processing utilities with blob deduplication

FileOperationsMixin

Save/load functionality and file I/O operations

CompressionMixin

Data compression utilities for efficient storage

Architecture Benefits

Separation of Concerns

Each component handles a specific aspect of memory management

Modular Design

Components can be mixed and matched as needed

Backend Flexibility

Easy to add new storage backends including RedisStack

Modular Design

Components can be mixed and matched as needed for different use cases

Performance Optimization

Specialized components allow for targeted optimizations including HNSW indexing

Usage Patterns

Direct Usage

from orka.memory import RedisMemoryLogger, RedisStackMemoryLogger

# Standard Redis backend
redis_logger = RedisMemoryLogger(redis_url="redis://localhost:6380")

# High-performance RedisStack backend with HNSW
redisstack_logger = RedisStackMemoryLogger(
    redis_url="redis://localhost:6380",
    enable_hnsw=True,
    vector_params={"M": 16, "ef_construction": 200}
)

Through Factory Function (Recommended)

from orka.memory_logger import create_memory_logger

# Automatically selects appropriate backend
memory = create_memory_logger("redisstack")  # Uses HNSW indexing
memory = create_memory_logger("redis")       # Standard Redis

Custom Implementation

from orka.memory import BaseMemoryLogger, SerializationMixin

class CustomMemoryLogger(BaseMemoryLogger, SerializationMixin):
    # Implement custom storage backend
    pass

Modular Components

Available Modules:

  • base_logger - Abstract base class and common functionality

  • redis_logger - Redis backend implementation

  • redisstack_logger - RedisStack backend with HNSW vector indexing

  • serialization - JSON sanitization and processing utilities

  • file_operations - File I/O and export functionality

  • compressor - Data compression utilities

Performance Characteristics

RedisStack vs Redis Logger:

  • Vector Search: Up to 100x faster with HNSW indexing vs manual cosine similarity for large datasets

  • Scalability: O(log n) vs O(n) search complexity (HNSW vs brute force)

  • Memory Usage: ~60% reduction in memory overhead with optimized vector storage

  • Concurrent Operations: Support for 1000+ simultaneous searches in typical deployments

Backward Compatibility

All components maintain compatibility with the original monolithic memory logger interface, ensuring existing code continues to work without modification. The RedisStack logger provides enhanced performance while preserving legacy API.

class orka.memory.BaseMemoryLogger(stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None)[source]

Bases: ABC, SerializationMixin, FileOperationsMixin

Base Memory Logger

Abstract base class that defines the interface and common functionality for all memory logger implementations in OrKa. This class provides the foundation for persistent memory storage across different backends.

Core Responsibilities

Interface Definition - Defines abstract methods that all memory backends must implement - Provides common initialization and configuration patterns - Establishes consistent behavior across different storage backends

Memory Lifecycle Management - Automatic memory decay based on configurable rules - Importance scoring for memory retention decisions - Memory type classification (short-term vs long-term) - Category-based memory organization (logs vs stored memories)

Data Optimization - Blob deduplication for large objects to reduce storage overhead - Serialization mixins for consistent data handling - File operation mixins for export/import functionality - Configurable thresholds for optimization decisions

Thread Safety - Thread-safe decay scheduling and management - Concurrent access patterns for multi-threaded environments - Proper resource cleanup and lifecycle management

Architecture Details

Memory Classification System - Categories: “log” (orchestration events) vs “stored” (persistent memories) - Types: “short_term” (temporary) vs “long_term” (persistent) - Importance Scoring: 0.0-1.0 scale based on event type and content - Decay Rules: Configurable retention policies per category/type

Blob Deduplication - SHA256 hashing for content identification - Reference counting for cleanup decisions - Configurable size threshold (default: 200 characters) - Automatic cleanup of unused blobs

Decay Management - Background thread for automatic cleanup - Configurable check intervals (default: 30 minutes) - Dry-run support for testing cleanup operations - Graceful shutdown with proper thread cleanup

Implementation Requirements

Required Abstract Methods All concrete implementations must provide:

  • log() - Store orchestration events and memory entries

  • tail() - Retrieve recent entries for debugging

  • cleanup_expired_memories() - Remove expired entries

  • get_memory_stats() - Provide storage statistics

  • Redis-compatible methods: hset, hget, hkeys, hdel, get, set, delete

  • Set operations: smembers, sadd, srem

Optional Enhancements Implementations may provide:

  • Vector search capabilities for semantic similarity

  • Advanced filtering and querying options

  • Performance optimizations for specific use cases

  • Integration with external systems (Redis, etc.)

Configuration Options

Decay Configuration

decay_config = {
    "enabled": True,
    "default_short_term_hours": 1.0,
    "default_long_term_hours": 24.0,
    "check_interval_minutes": 30,
    "memory_type_rules": {
        "long_term_events": ["success", "completion", "write", "result"],
        "short_term_events": ["debug", "processing", "start", "progress"]
    },
    "importance_rules": {
        "base_score": 0.5,
        "event_type_boosts": {"write": 0.3, "success": 0.2},
        "agent_type_boosts": {"memory": 0.2, "openai-answer": 0.1}
    }
}

Blob Deduplication - _blob_threshold: Minimum size for deduplication (default: 200 chars) - Automatic reference counting and cleanup - SHA256 hashing for content identification

Usage Patterns

Implementing a Custom Backend

from orka.memory.base_logger import BaseMemoryLogger

class CustomMemoryLogger(BaseMemoryLogger):
    def __init__(self, **kwargs):
        super().__init__(**kwargs)
        self._storage = {}  # Your storage implementation

    def log(self, agent_id, event_type, payload, **kwargs):
        # Implement storage logic
        pass

    def cleanup_expired_memories(self, dry_run=False):
        # Implement cleanup logic
        pass

    # ... implement other abstract methods

Memory Classification Logic - Orchestration logs are always classified as short-term - Only “stored” memories can be classified as long-term - Importance scoring influences retention decisions - Event types and agent types affect classification

Thread Safety Considerations - Decay scheduler runs in background thread - Proper synchronization for concurrent access - Graceful shutdown handling with stop events - Resource cleanup on object destruction

__init__(stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None) None[source]

Initialize the memory logger.

Parameters:
  • stream_key – Key for the memory stream. Defaults to “orka:memory”.

  • debug_keep_previous_outputs – If True, keeps previous_outputs in log files for debugging.

  • decay_config – Configuration for memory decay functionality.

  • memory_preset – Name of memory preset to use (sensory, working, episodic, semantic, procedural, meta). If provided, preset config is used as base and merged with decay_config.

abstractmethod cleanup_expired_memories(dry_run: bool = False) dict[str, Any][source]

Clean up expired memory entries based on decay configuration.

Parameters:

dry_run – If True, return what would be deleted without actually deleting

Returns:

Dictionary containing cleanup statistics

abstractmethod delete(*keys: str) int[source]

Delete keys.

abstractmethod get(key: str) str | None[source]

Get a value by key.

abstractmethod get_memory_stats() dict[str, Any][source]

Get memory usage statistics.

Returns:

Dictionary containing memory statistics

abstractmethod hdel(name: str, *keys: str) int[source]

Delete fields from a hash structure.

abstractmethod hget(name: str, key: str) str | None[source]

Get a field from a hash structure.

abstractmethod hkeys(name: str) list[str][source]

Get all keys in a hash structure.

abstractmethod hset(name: str, key: str, value: str | bytes | int | float) int[source]

Set a field in a hash structure.

abstractmethod log(agent_id: str, event_type: str, payload: dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: dict[str, Any] | None = None, agent_decay_config: dict[str, Any] | None = None, log_type: str = 'log') None[source]

Log an event to the memory backend.

abstractmethod sadd(name: str, *values: str) int[source]

Add members to a set.

save_enhanced_trace(file_path: str, enhanced_data: Dict[str, Any]) None[source]

Save enhanced trace data with memory backend references and blob deduplication.

abstractmethod set(key: str, value: str | bytes | int | float) bool[source]

Set a value by key.

abstractmethod smembers(name: str) list[str][source]

Get all members of a set.

abstractmethod srem(name: str, *values: str) int[source]

Remove members from a set.

stop_decay_scheduler()[source]

Stop the automatic decay scheduler.

abstractmethod tail(count: int = 10) list[dict[str, Any]][source]

Retrieve the most recent events.

class orka.memory.FileOperationsMixin[source]

Bases: object

Mixin class providing file operations for memory loggers.

static load_from_file(file_path: str, resolve_blobs: bool = True) Dict[str, Any][source]

Load and optionally resolve blob references from a deduplicated log file.

Parameters:
  • file_path – Path to the log file

  • resolve_blobs – If True, resolve blob references to original content

Returns:

Dictionary containing metadata, events, and optionally resolved content

save_to_file(file_path: str) None[source]

Save the logged events to a JSON file with blob deduplication.

This method implements deduplication by: 1. Replacing repeated JSON response blobs with SHA256 references 2. Storing unique blobs once in a separate blob store 3. Reducing file size by ~80% for typical workflows 4. Meeting data minimization requirements

Parameters:

file_path – Path to the output JSON file.

class orka.memory.RedisMemoryLogger(redis_url: str | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None)[source]

Bases: BaseMemoryLogger

🚀 High-performance memory engine - Redis-powered storage with intelligent decay.

What makes Redis memory special: - Lightning Speed: Sub-millisecond memory retrieval with 10,000+ writes/second - Intelligent Decay: Automatic expiration based on importance and content type - Semantic Search: Vector embeddings for context-aware memory retrieval - Namespace Isolation: Multi-tenant memory separation for complex applications - Stream Processing: Real-time memory updates with Redis Streams

Performance Characteristics: - Write Throughput: 10,000+ memories/second sustained - Read Latency: <50ms average search latency - Memory Efficiency: Automatic cleanup of expired memories - Scalability: Horizontal scaling with Redis Cluster support - Reliability: Persistence and replication for production workloads

Advanced Memory Features:

1. Intelligent Classification: - Automatic short-term vs long-term classification - Importance scoring based on content and context - Category separation (stored memories vs orchestration logs) - Custom decay rules per agent or memory type

2. Namespace Management: ```python # Conversation memories namespace: “user_conversations” # → Stored in: orka:memory:user_conversations:session_id

# Knowledge base namespace: “verified_facts” # → Stored in: orka:memory:verified_facts:default

# Error tracking namespace: “system_errors” # → Stored in: orka:memory:system_errors:default ```

3. Memory Lifecycle: - Creation: Rich metadata with importance scoring - Storage: Efficient serialization with compression - Retrieval: Context-aware search with ranking - Expiration: Automatic cleanup based on decay rules

Perfect for: - Real-time conversation systems requiring instant recall - High-throughput API services with memory requirements - Interactive applications with complex context management - Production AI systems with reliability requirements

Production Features: - Connection pooling for high concurrency - Graceful degradation for Redis unavailability - Comprehensive error handling and logging - Memory usage monitoring and alerts - Backup and restore capabilities

__del__()[source]

Cleanup when object is destroyed.

__init__(redis_url: str | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None) None[source]

Initialize the Redis memory logger.

Parameters:
  • redis_url – URL for the Redis server. Defaults to environment variable REDIS_URL or redis service name.

  • stream_key – Key for the Redis stream. Defaults to “orka:memory”.

  • debug_keep_previous_outputs – If True, keeps previous_outputs in log files for debugging.

  • decay_config – Configuration for memory decay functionality.

  • memory_preset – Name of memory preset (sensory, working, episodic, semantic, procedural, meta).

cleanup_expired_memories(dry_run: bool = False) dict[str, Any][source]

Clean up expired memory entries based on decay configuration.

Parameters:

dry_run – If True, return what would be deleted without actually deleting

Returns:

Dictionary containing cleanup statistics

close() None[source]

Close the Redis client connection.

delete(*keys: str) int[source]

Delete keys from Redis.

Parameters:

*keys – Keys to delete.

Returns:

Number of keys deleted.

get(key: str) str | None[source]

Get a value by key from Redis.

Parameters:

key – The key to get.

Returns:

Value if found, None otherwise.

get_memory_stats() dict[str, Any][source]

Get memory usage statistics.

Returns:

Dictionary containing memory statistics

hdel(name: str, *keys: str) int[source]

Delete fields from a Redis hash.

Parameters:
  • name – Name of the hash.

  • *keys – Keys to delete.

Returns:

Number of fields deleted.

hget(name: str, key: str) str | None[source]

Get a field from a Redis hash.

Parameters:
  • name – Name of the hash.

  • key – Field key.

Returns:

Field value.

hkeys(name: str) list[str][source]

Get all keys in a Redis hash.

Parameters:

name – Name of the hash.

Returns:

List of keys.

hset(name: str, key: str, value: Any) int[source]

Set a field in a Redis hash.

Parameters:
  • name – Name of the hash.

  • key – Field key.

  • value – Field value.

Returns:

Number of fields added.

log(agent_id: str, event_type: str, payload: dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: dict[str, Any] | None = None, agent_decay_config: dict[str, Any] | None = None, log_type: str = 'log') None[source]

Log an event to the Redis stream.

Parameters:
  • agent_id – ID of the agent generating the event.

  • event_type – Type of event.

  • payload – Event payload.

  • step – Execution step number.

  • run_id – Unique run identifier.

  • fork_group – Fork group identifier.

  • parent – Parent agent identifier.

  • previous_outputs – Previous agent outputs.

  • agent_decay_config – Agent-specific decay configuration overrides.

Raises:

ValueError – If agent_id is missing.

property redis: Redis

Return the Redis client for backward compatibility. This property exists for compatibility with existing code.

sadd(name: str, *values: str) int[source]

Add members to a Redis set.

Parameters:
  • name – Name of the set.

  • *values – Values to add.

Returns:

Number of new members added.

set(key: str, value: str | bytes | int | float) bool[source]

Set a value by key in Redis.

Parameters:
  • key – The key to set.

  • value – The value to set.

Returns:

True if successful, False otherwise.

smembers(name: str) list[str][source]

Get all members of a Redis set.

Parameters:

name – Name of the set.

Returns:

Set of members.

srem(name: str, *values: str) int[source]

Remove members from a Redis set.

Parameters:
  • name – Name of the set.

  • *values – Values to remove.

Returns:

Number of members removed.

tail(count: int = 10) list[dict[str, Any]][source]

Retrieve the most recent events from the Redis stream.

Parameters:

count – Number of events to retrieve.

Returns:

List of recent events.

class orka.memory.RedisStackMemoryLogger(redis_url: str = 'redis://localhost:6380/0', index_name: str = 'orka_enhanced_memory', embedder=None, memory_decay_config: dict[str, Any] | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None, enable_hnsw: bool = True, vector_params: dict[str, Any] | None = None, format_params: dict[str, Any] | None = None, **kwargs)[source]

Bases: BaseMemoryLogger

🚀 Ultra-high-performance memory engine - RedisStack-powered with HNSW vector indexing.

Revolutionary Performance: - Lightning Speed: Sub-millisecond vector searches with HNSW indexing - Massive Scale: Handle millions of memories with O(log n) complexity - Smart Filtering: Hybrid search combining vector similarity with metadata - Intelligent Decay: Automatic memory lifecycle management - Namespace Isolation: Multi-tenant memory separation

Performance Benchmarks: - Vector Search: Up to 100x faster than FLAT indexing for large datasets - Write Throughput: 50,000+ memories/second sustained (typical configurations) - Search Latency: <5ms for complex hybrid queries (optimized deployments) - Memory Efficiency: ~60% reduction in storage overhead vs unoptimized storage - Concurrent Users: 1000+ simultaneous search operations (typical deployments)

Advanced Vector Features:

1. HNSW Vector Indexing: - Hierarchical Navigable Small World algorithm - Configurable M and ef_construction parameters - Optimal for semantic similarity search - Automatic index optimization and maintenance

2. Hybrid Search Capabilities: ```python # Vector similarity + metadata filtering results = await memory.hybrid_search(

query_vector=embedding, namespace=”conversations”, category=”stored”, similarity_threshold=0.8, ef_runtime=20 # Higher accuracy

)

3. Intelligent Memory Management: - Automatic expiration based on decay rules - Importance scoring for retention decisions - Category separation (stored vs logs) - Namespace-based multi-tenancy

4. Production-Ready Features: - Connection pooling and failover - Comprehensive monitoring and metrics - Graceful degradation capabilities - Migration tools for existing data

Perfect for: - Real-time AI applications requiring instant memory recall - High-throughput services with complex memory requirements - Multi-tenant SaaS platforms with memory isolation - Production systems requiring 99.9% uptime

__del__()[source]

Cleanup when the logger is destroyed.

__init__(redis_url: str = 'redis://localhost:6380/0', index_name: str = 'orka_enhanced_memory', embedder=None, memory_decay_config: dict[str, Any] | None = None, stream_key: str = 'orka:memory', debug_keep_previous_outputs: bool = False, decay_config: dict[str, Any] | None = None, memory_preset: str | None = None, enable_hnsw: bool = True, vector_params: dict[str, Any] | None = None, format_params: dict[str, Any] | None = None, **kwargs)[source]

Initialize the RedisStack memory logger.

Parameters:
  • redis_url – Redis connection URL. Defaults to redis://localhost:6380/0.

  • index_name – Name of the RedisStack index for vector search.

  • embedder – Optional embedder for vector search.

  • memory_decay_config – Configuration for memory decay functionality.

  • stream_key – Key for the Redis stream.

  • debug_keep_previous_outputs – If True, keeps previous_outputs in log files.

  • decay_config – Legacy decay configuration (use memory_decay_config instead).

  • memory_preset – Name of memory preset (sensory, working, episodic, semantic, procedural, meta).

  • enable_hnsw – Whether to enable HNSW vector indexing.

  • vector_params – HNSW configuration parameters.

  • **kwargs – Additional parameters for backward compatibility.

cleanup_connections() dict[str, Any][source]

Clean up connection resources.

cleanup_expired_memories(dry_run: bool = False) dict[str, Any][source]

Clean up expired memories using connection pool.

clear_all_memories()[source]

Clear all memories from the RedisStack storage.

close()[source]

Clean up resources.

delete(*keys: str) int[source]

Delete keys.

delete_memory(key: str) bool[source]

Delete a specific memory entry.

ensure_index() bool[source]

Ensure the enhanced memory index exists - for factory compatibility.

get(key: str) str | None[source]

Get a value by key.

get_all_memories(trace_id: str | None = None) list[dict[str, Any]][source]

Get all memories, optionally filtered by trace_id.

get_connection_stats() dict[str, Any][source]

Get connection pool statistics for monitoring.

get_memory_stats() dict[str, Any][source]

Get comprehensive memory storage statistics.

get_performance_metrics() dict[str, Any][source]

Get RedisStack performance metrics including vector search status.

get_recent_stored_memories(count: int = 5) list[dict[str, Any]][source]

Get recent stored memories (log_type=’memory’ only), sorted by timestamp.

hdel(name: str, *keys: str) int[source]

Delete fields from a hash structure.

hget(name: str, key: str) str | None[source]

Get a field from a hash structure.

hkeys(name: str) list[str][source]

Get all keys in a hash structure.

hset(name: str, key: str, value: str | bytes | int | float) int[source]

Set a field in a hash structure.

log(agent_id: str, event_type: str, payload: dict[str, Any], step: int | None = None, run_id: str | None = None, fork_group: str | None = None, parent: str | None = None, previous_outputs: dict[str, Any] | None = None, agent_decay_config: dict[str, Any] | None = None, log_type: str = 'log') None[source]

Log an orchestration event as a memory entry.

This method converts orchestration events into memory entries for storage.

log_memory(content: str, node_id: str, trace_id: str, metadata: dict[str, Any] | None = None, importance_score: float = 1.0, memory_type: str = 'short_term', expiry_hours: float | None = None) str[source]

Store memory with vector embedding for enhanced search.

property redis

Backward compatibility property for redis client access.

sadd(name: str, *values: str) int[source]

Add members to a set.

search_memories(query: str, num_results: int = 10, trace_id: str | None = None, node_id: str | None = None, memory_type: str | None = None, min_importance: float | None = None, log_type: str = 'memory', namespace: str | None = None) list[dict[str, Any]][source]

Search memories using enhanced vector search with filtering.

Parameters:
  • query – Search query text

  • num_results – Maximum number of results

  • trace_id – Filter by trace ID

  • node_id – Filter by node ID

  • memory_type – Filter by memory type

  • min_importance – Minimum importance score

Returns:

List of matching memory entries with scores

set(key: str, value: str | bytes | int | float) bool[source]

Set a value by key.

smembers(name: str) list[str][source]

Get all members of a set.

srem(name: str, *values: str) int[source]

Remove members from a set.

tail(count: int = 10) list[dict[str, Any]][source]

Get recent memory entries.

class orka.memory.SerializationMixin[source]

Bases: object

Mixin class providing JSON serialization capabilities for memory loggers.

Submodules